Fluency Processing Language

Why FPL?

FPL is a programming language designed for data analysis and data visualization. The primary goal of FPL is to solve the problem of API integration.

The current industry standard language is SQL, which was designed for relational database. On top of SQL, there are numerous "Query Languages" designed for specific data sources. For example, Splunk has SPL, AWS has CloudWatch Query Language, Azure has Kusto Query Language, etc. However, none of these languages are real "Programming Language". For example, they don't support if-else statement, for loop, real function call etc.

FPL supports all the features of a modern programming language. It also supports a rich set of utility functions and methods to simplify data analysis and data visualization.

Data Types

Primitive Value

  • string
  • blob
  • int64
  • float64
  • bool
  • null
  • list of primitive value // []_primitive_value
    • 0-indexed
  • map of primitive value // map[string]*primitive_value
  • list of interface // Json Array []interface{}
  • map of interface // Json Object map[string]interface{}
  • undefined

Object Types

Error

  • new Error(message, )
  • name: "Error", "RangeError", "InternalError", "ReferenceError", "SyntaxError", "TypeError"

Time

  • new Time() // create a time object. The value is set to current timestamp
  • new Time(epoch) // create a time object. The epoch could be second or millisecond
  • new Time(relativeTime) // create a time object from a relative time]
  • new Time(layout, value, )
  ANSIC       = "Mon Jan _2 15:04:05 2006"
	UnixDate    = "Mon Jan _2 15:04:05 MST 2006"
	RubyDate    = "Mon Jan 02 15:04:05 -0700 2006"
	RFC822      = "02 Jan 06 15:04 MST"
	RFC822Z     = "02 Jan 06 15:04 -0700" // RFC822 with numeric zone
	RFC850      = "Monday, 02-Jan-06 15:04:05 MST"
	RFC1123     = "Mon, 02 Jan 2006 15:04:05 MST"
	RFC1123Z    = "Mon, 02 Jan 2006 15:04:05 -0700" // RFC1123 with numeric zone
	RFC3339     = "2006-01-02T15:04:05Z07:00"
	RFC3339Nano = "2006-01-02T15:04:05.999999999Z07:00"
	Kitchen     = "3:04PM"
	// Handy time stamps.
	Stamp      = "Jan _2 15:04:05"
	StampMilli = "Jan _2 15:04:05.000"
	StampMicro = "Jan _2 15:04:05.000000"
	StampNano  = "Jan _2 15:04:05.000000000"
	DateTime   = "2006-01-02 15:04:05"
	DateOnly   = "2006-01-02"
	TimeOnly   = "15:04:05"
  • location is optional: If the name is "" or "UTC", the timezone is set to "UTC". Otherwise, the name is taken to be a location name corresponding to a file in the IANA Time Zone database, such as "America/New_York"

Lambda

  • simple expression: (x, y) => x+y
  • block statements: the parament support object/array destructuring: (obj) => obj.field1 + obj.field2 OR ({field1, field2})
  (x, y) => {
      let a = x + 1
      return a + y 
   }  // explicit return is required

** the lambda parameter support object/array destructuring:

  // obj is a map/object type variable
  (obj) => obj.field1 + obj.field2   
  // OR
  ({field1, field2}) => field1 + field2
  • FplTable
    new Table(col1,col2,...)
    Columns
    ** Rows

  • FplMetric
    from,to
    interval
    dimensions
    Metrics

  • FplAlert

  • Tuple: list of Data Types or Object Types

  • Map: map of Data Types or Object Types

Operators

  • Binary operators comprise
    arithmetic operators:
    *
    '+' : addition
    '-': subtraction
    '*': multiplication
    '/': float division
    '%': modulo

bitwise operators:
*
'&': bitwise AND
'|': bitwise OR
'^': bitwise exclusive OR
'>>': right shift
'<<': left shift

relational operators:
*
'==': equality
'!=': inequality
'<': less than
'>': greater than
'<=': less or equal
*** '>=': greater or equal

logical operators:
*
'&&': and
'||': or
'!': not

conditional(ternary) operator
*
condition? trueValue:falseValue

optional chaining (?.)
*
The optional chaining (?.) operator accesses an object's property or calls a function. If the object's property or function is undefined or null, the expression evaluates to undefined instead of throwing an error.

Control Blocks

  • if/elseif/else support
    ** value to bool conversion: false, null, undefined, 0, "", are false, all other values are true
let s = 100
if !s {
  printf("s has a false value")
} elseif s > 100 {
  printf("s is greater than 100")
} else {
  printf("s is less than or greater to 100")
}
  • for loop support
    for = range { }
    for = range { }
    for range loop also apply to utf8 encoded string
    *
    in this case, the index of the loop is the starting position of the current rune, measured by bytes. see the example below
let lst = [0, 10, 20]
for i, v = range lst {
  printf("index: %d:  value: %d", i, v)
}

let map = {x:0, y:10, z:20}
for k, v = range map {
  printf("key: %s:  value: %d", k, v)
}

// apply for utf8 encoded string
let nihongo = "日本語"
for i, s = range nihongo {
  printf("i:%d  s:%s", i, s)
}
// i:0  s:Êó•
// i:3  s:Êú¨
// i:6  s:語
  • for loop with three components: for init?; condition?; post? { }
let list = [0, 10, 20]
for let i = 0; i < len(list); i++ {
  printf("index: %d:  value: %d", i, list[i])
}
  • break
    ** break out of the current for loop

  • continue
    ** skip the current iteration of the for loop

  • throw
    ** throw new Error("invalid data type")

  • try { } catch () {} finally {}

try {
  nonExistentFunction();
} catch (e) {
  printf("%s: %s", e.name, e.message);
  // print out: ReferenceError: nonExistentFunction is not defined
} finally {
  // execute after the try block and catch block(s) execute, 
  // but before the statements following the try...catch...finally block
}

  • return

  • comments
    single-line comments //
    multi-line comments /* */

Function

  • function (parameters) { }

  • function main() {}
    ** main function is the execution starting point

Utility Library (both input and output are primitive values)

  • toLower(string) => string
    ** returns the string in lowercase
toLower("HELLO") // return the string "hello"
toLower(" World") // return the string " world"
  • toUpper(string) => string
    ** returns the string in uppercase
toUpper("hello") // return the string "HELLO"
toUpper("wORld") // return the string "WORLD"
  • startsWith(string, prefix) => bool
    returns true if string starts with prefix, false otherwise
    is case and whitespace sensitive
let s = "hello"
startsWith("hello", "he") // return true
startsWith("hello", "He") // return false
  • endsWith(string, suffix) => bool
    returns true if string ends with suffix, false otherwise
    is case and whitespace sensitive
let s = "hello"
endsWith("hello", "llo") // return true
endsWith("hello", "LLO") // return false
  • contains(string, subString) => bool
    returns true if subString exists in string false otherwise
    is case and whitespace sensitive
let s = "hello"
contains("hello", "ello") // return true
contains("hello", "hi") // return false
contains("hello", "He") // return false
  • content(string1, string2) => bool
    returns true if string1 equals string2 false otherwise
    is case and whitespace sensitive
let s = "hello"
content(s, "hello") // return true
content(s, "Hello") // return false
content(s, "hello ") // return false
  • trim(s, cutset) => string
    returns a sliced of the string s with all leading and trailing Unicode code points contained in cutset removed.
    cutset will be seen as a collection of characters
let s = "Hello and Hello"
trim(s, "Hello") // return the string "and"
trim(s, "o leH") // return the string "and"
trim(s, "Hel") // return the string "lo and Hello"
  • trimPrefix(s, prefix) => string
    returns s without the provided leading prefix string. If s doesn't start with prefix, s is returned unchanged.
    is case and whitespace sensitive
let s = "Hello World"
trimPrefix(s, "Hello ") // return the string "World"
trimPrefix(s, "hello") // return the string "Hello World"
  • trimSuffix(s, suffix) => string
    returns s without the provided trailing suffix string. If s doesn't end with suffix, s is returned unchanged.
    is case and whitespace sensitive
let s = "Hello World"
trimSuffix(s, "World") // return the string "Hello "
trimSuffix(s, "Hello") // return the string "Hello World"
  • split(variable, delim)
    ** split the input string on delim and returns a list of string
let s = "1,2,3"
split(s, ",") // return a list ["1", "2", "3"]
split(s, "2") // return a list ["1,", ",3"]
split(s, "1") // return a list ["", ",2,3"]
  • indexOf(s, substring)
    returns the index of the first instance of a substring in a given string.
    return -1 if the substring is not available.
let s = "abcd"
let i = indexOf(s, "b")
let j = indexOf(s, "n")
printf("i=%d  j=%d", i, j)
// i: 1  j:-1
  • subString(s, start, end)
    ** extracts substring from start to end (exclusion)
let s = "abcd"
let sub = subString(s, 1, 2)
printf("subString=%s", sub)
// subString=b
  • parseInt(s, base)
    parse a string in the given base into a 64bit integer
    if base is not given, it will default to 0
    ** if the base argument is 0, the true base is implied by the string's prefix (if present): 2 for "0b", 8 for "0" or "0o", 16 for "0x", and 10 otherwise
let s = 10
parseInt(s) // return the int64 value of 10
parseInt(s, 2) // return the int64 value of 2

let s = "0b10"
parseInt(s) // return the int64 value of 2
  • parseFloat(s)
    ** parse a string into a 64bit floating-point number
parseFloat("10") // return the float64 value of 10.0
parseFloat("10.11") // return the float64 value of 10.11
  • parseBool(s)
    returns the boolean value represented by the string.
    it accepts 1, t, T, TRUE, true, True, 0, f, F, FALSE, false, False. Any other values returns undefined
parseBool("1") // return the bool value true
parseBool("f") // return the bool value false
parseBool("fa") // return undefined
  • parseJson(text)
    parse a JSON string
    return JsonObject, JsonArray, string, float, int, bool or null value
  • coalesce(var1, var2, var3, ...)
    ** return the first argument that is a non-empty string value, undefined otherwise
coalesce("str1", "str2", "str3", ...) // return the string "str1"
coalesce("", 15, "str3", ...) // return the string "str3"
coalesce("", "", "") // return undefined
  • isValidIP(s)
    ** return true if the input string is a valid ip address
  • isIPv4(s)
    ** return true if the input string is a valid IPv4 address
  • isIPv6(s)
    ** return true if the input string is a valid IPv6 address
  • ipNormalize(s)
    ** return normalized IP address.
  • timezoneOffset(timezone)
    ** return timezone offset in seconds against the UTC timezone
 printf("America/New_York offset: %d", timezoneOffset("America/New_York"))
 printf("Europe/Berlin offset: %d", timezoneOffset("Europe/Berlin"))

 // America/New_York offset: -14400
 // Europe/Berlin offset: 7200

  • replace(s, old, new, count)
    returns a copy of the given string, starting with the first 'count' non-overlapping instances of the old string replaced with the new one
    s: the input string
    old: the string to be replaced
    new: the string that replaces the old one
    count: up to the number of times the old string will be replaced.
    if count is less than zero, no limit on the number of replacement
let s = "a a a"
replace(s, "a", "Hello", 1) // return the string "Hello a a"
replace(s, "a", "Hello", 0) // return the string "a a a"
replace(s, "a", "Hello", -1) // return the string "Hello Hello Hello"
  • replaceAll(s, regexp, replacement, count)
    ReplaceAll returns a copy of src, replacing matches of the Regexp with the replacement text repl. Inside repl, $ signs are interpreted as in Expand, so for instance $1 represents the text of the first submatch.
    if count is less than zero, no limit on the number of replacement
let s = "'foo' 'bar'"
let s2 = replaceAll(s, "'([^']*)'", "${1}", -1)
printf("s2=%s", s2)
// s2=foo bar
  • match(pattern, s)
    return true if the input string s contains any match of the regular expression pattern.
    use the ^ and $ modifiers to denote if the regex pattern match the full input string.
let s = "Hello"
match("^H", s) // return true since s starts with "H"
match("^h", s) // return false since s does not start with "h"
  • regexp(pattern, s)
    ** this function extracts the captured "named group" matching the regular expression pattern from s.
let Email = "[email protected]"
let obj = regexp("(?P<Name>.*)@(?P<Domain>.*)", Email) // sets obj to {Name: "foo", "Domain: "@gmail.com}
let {Name, Domain} = regexp("(?P<Name>.*)@(?P<Domain>.*)", Email) // sets the var Name = "foo" and Domain = "@gmail.com"
let obj =regexp("(?P<Name>.*)@(?P<Domain>.*)", "foo") // return undefined
  • len(variable)
    if variable is primitive string, returns the length of the input string
    if variable is primitive list, returns the length of the list
    if variable is primitive map, returns the number of key-value pairs in the map
    if variable is json array, returns the number of elements in the array
    if variable is json object, returns the number of key-value pairs in the object
    if variable is Tuple, returns the number of elements in the tuple
    if variable is Map, returns the number of key-value pairs in the map
    if variable is Table, returns row count of the table
    if variable is MetricStream, returns the number of data series in the metric
    if variable is Alert, returns the number of entries in the alert
    ** else return 0
len("Hello") // return an int64 value of 5
len([1, 2, 3]) // return an int64 value of 3
len({Name: "foo", Domain: "@gmail.com"}) // return an int64 value of 2
  • append(list, element)
    if list is primitive string and element is primitive string, return a new string.
    if list is primitive list type and element is primitive value, appends element to the primitive value list
    if list is primitive json type and element is primitive value, appends element to the json array
    if list is tuple type, append element to the tuple
    ** else return error
let s = "ab"
s = append(s, "cd") // s is now the string "abcd"

let src = [1, 2, 3, 4]
append(src, 5) // src is still [1, 2, 3, 4] as it's value is not set to after append
src = append(src,5) // src is now [1, 2, 3, 4, 5]
  • concat(list1, list2)
    ** merge two list type input into one new list
let a = [1, 2]
let b = [3, 4]
let c = concat(a, b) // c is now [1, 2, 3, 4]
  • delete(map, key)
    map must be primitive map, jsonObject or object map.
    key must be primitive string
let m = {first: 10, second: 20}
delete(m, "first") // m is now the map {second: 20}
  • setEnv(name, value)
    ** set the environment variable $name to value
  • getEnv(name)
    ** get the environment variable $name
setEnv("from", "-24h")
setEnv("to", "@h")
  • typeof(variable)
    if variable is primitive value, returns the type of the primitive value:
    *
    "string", "int64", "float64", "bool", "null", "undefined", "list", "map", "jsonObj", "jsonArray"
    else return the type of the object:
    *
    "Tuple", "Map", "Lambda", "Table", "MetricStream", "Alert"
typeof(2) // return the string "int64"
typeof([1, 2, 3]) // return the string "list"
  • isNull(var)
    ** return true if var is a null type, false otherwise
isNull("Hello") // return false
isNull(null) // return true
  • isUndef(var)
    ** return true if var is undefined type, false otherwise
isUndef(null) // return false

let s = coalesce("", "", "") // return undefined
isUndef(s) // return true
  • isString(var)
    ** return true if var is of string type, false otherwise
isString("abc") // return true
isString(64) // return false
  • isBlob(var)
    ** return true if var is of blob type, false otherwise

  • isNumber(var)
    ** return false if var is of int64 or float64 type, false otherwise

isNumber("abc") // return false
isNumber(64) // return true
  • jsonClone(jsonValue)
    ** return a deep copy of the input json object or array
let a = {"string":"abc", "int": 1, "float": 2.01, "bool": true, "null":null, "array":[1,2,3], "map":{"foo":"bar"}}
let b = jsonClone(a)
printf("%s",b)
// {"array":[1,2,3],"bool":true,"float":2.01,"int":1,"map":{"foo":"bar"},"null":null,"string":"abc"}
  • toString(variable)
    ** convert the input variable to a string (string|blob|int64|float64|bool|null|undefined|jsonObj|jsonArray)

  • base64Encode(blob|string)
    ** base64 encode the input string or blob

  • base64Decode(string)
    ** base64 decode the input string and return the decoded string as a blob

let s = "Hello World"
let enc = base64Encode(s)
printf("enc=%s", enc)
let dec = base64Decode(enc)
printf("dec=%s", toString(dec))
  • gzipCompress(blob|string)
    ** gzip compress the input string or blob
  • gzipDecompress(blob)
    ** gzip decompress the input blob and return the decompressed as a blob
let s = "Hello World"
let gzBlob = gzipCompress(s)
let b = gzipDecompress(gzBlob)
printf("%s", toString(b))
  • run(lambda, arguments...)
    ** run lambda function with optional arguments
let printLabel = () => {
  printf("hello world")
}
run(printLabel)

let inc = (i) => {
  return i+1
}    
printf("inc %d", run(inc, 2))
  • sprintf(format, arguments...)
    golang's printf format
    if format is not given, will default to string

  • printf(format, arguments...)
    golang printf format
    if format is not given, will default to string
    format specifiers:
    *
    %v : formats the value in a default format
    %d : formats decimal integers
    %f : formats the floating-point numbers
    %g : formats the floating-point numbers and removes trailing zeros
    %b : formats base 2 numbers
    %o : formats base 8 numbers
    %t : formats true or false values
    *** %s : formats string values

printf("%d", 2) // prints 2 as a string to traces
printf(2) // ERROR: expected string but int64 given
printf("2") // prints the string 2 to traces
  • case(condition_1, value_1, [condition_2, value_2, ...], default_value)
    ** evaluate a list of conditions and returns the first value whose condition is evaluated to true. If all conditions are false, the default value is returned
let i = 10
case(i>10, "bigger than ten", i>=0, "positive", "negative") // return "positive"
let i = -10
case(i>10, "bigger than ten", i>=0, "positive", "negative") // return "negative"
  • template(text, variableMap)
    generate text output based on input variables
    the template format is the same as Golang's template
    ** the variableMap is a map type holding variables. or a json object
let t = `Value of a: {{.fields.a}} 
         List: {{.list}}`
let opt = {"fields":{"a":"foo"}, "list": [1234, 5678]}
let s = template(t, opt)
//   Value of a: foo 
//   List: [1234 5678]
  • jsonTable(array)
    ** generate a table from a literal array expression.
let arr = [
  {ID: "a", Col1: "x"},
  {ID: "b", Col2: "y"}
]

let t = jsonTable(arr)
  • mergeTable(table1, table2..)
    ** generate a new table by merging input tables

.Table t1

IDCityStateCountry
1RockvilleMarylandUS
2Silver SpringMarylandUS
3BaltimoreMarylandUS

.Table t2

IDCityStateCountry
4SeattleWashingtonUS
5BellevueWashingtonUS
6SpokaneMarylandUS

let t3 = mergeTable(t1, t2) // t3 is a new table with data from t1 followed by t2

.Table t3 resulting from the mergeTable function call

IDCityStateCountry
1RockvilleMarylandUS
2Silver SpringMarylandUS
3BaltimoreMarylandUS
4SeattleWashingtonUS
5BellevueWashingtonUS
6SpokaneMarylandUS

System Functions

  • pluginLambda(pluginType, customers, (customer) => {}
    ** call registered plugin to run FPL lambda
let threats = pluginLambda("Cylance", "*", (customer) => {      
       let threats = Cylance_LoadThreat((obj) => {
          let {sha256:ID, md5, name, classification, sub_classification} = obj
          return {ID, name, classification, sub_classification, customer}
       })
       return {threats}
})      
  • AWSAccountRegionLambda(accounts, regions, (account, region) => { return {} })
    Run lambda function on specific AWS accounts and regions
    accounts: "
    " enables all configured AWS accounts. Account could also be one account name or an array of names
    accounts: "Production" or ["Production", "UnitTest"]
    regions: "_" enables all configured regions. Regions could also be one region name or an array of names
    regions: "us-east-1" or ["us-east-1", "us-east-2"]
    this function returns a map of objects
    ** results from different regions will be merged into one
// enabling only the Production account from the region us-east-1
AWS_AccountRegionLambda("Production","us-east-1", (account, region) => {
  /*
    code block
  */
  return {table1, table2, ...}
})

// enabling all configured accounts from all configured region
AWS_AccountRegionLambda("*","*", (account, region) => {
  /*
    code block
  */
  return {table1, table2, ...}
})
  • AWS_AccountLambda(accounts, (account) => { return {} })
    ** lambda function on specific AWS accounts (One example is AWS Cost and Usage API, which does not limit to one specific region)

  • transform(stream, lambda)
    create a new stream. The data series of the new stream is the result of the lambda function.
    lambda function interface: (ts, key, value) => { }

let duration = AWS_GetMetric("Duration", options, filters)
let invocation = AWS_GetMetric("Invocations", options, filters)
let durationCost = transform(duration, (ts, key, value) => (value/1000) * assetTable[key].lambdaMemoryRate)
let invocationCost = transform(invocation, (ts, key, value) => value * assetTable[key].lambdaRequestRate)
  • anomaly(stream, {seasonal:"auto", minDiff: 3.0, minDiffPercent: 10.0})
    anomaly detection on one stream
    seasonal: auto | weekday-end-hourly | hourly | weekday-hourly | ""
    minDiff: absolute difference over mean: abs(value - mean)
    minDiffPercent: relative percent over mean: (value - mean)/mean
    ** return FplAlert object

  • RxFPL_GetMetric(metricName, {options} )
    Load metric from rxfpl database
    from: range from
    to: range to
    order: desc | asc // default is desc
    limit: number of metrics // default is 10
    filters: search filters
    [source,javascript]

function main() {
  let cost = RxFPL_GetMetric("PureCloudOps.AWS.Billing.InstanceCost", {from:"-2h@h", to:"@h", filters:[{name:"lvdb-app", values:"archiveSearchV3", exclude: true}]})
  return {cost}
}
  • alert(, window(condition,n,m))
    ** sliding window detection
function queueAlerts(queues) {
  let options = {from: "-1h@h", to: "@h", dimensions: ["QueueName"], namespace: "AWS/SQS", period: "5m", stat: "Maximum", unit:"Second"}
  let filters = {QueueName: queues}
  let ages = AWS_GetMetric("ApproximateAgeOfOldestMessage", options, filters)
  let ageAlerts = alert(ages, window(ages > 3600, 2, 2))
  options.stat = "Sum"
  let received = AWS_GetMetric("NumberOfMessagesReceived", options, filters)
  options.stat = "Average"
  let queueLength = AWS_GetMetric("ApproximateNumberOfMessagesVisible", options, filters)
  let consumerStopAlerts = alert(queueLength, window(received #0 && queueLength > 1, 2, 2))
  return {ageAlerts, consumerStopAlerts}
}

Object Methods

Collection Methods

  • Collection Methods apply the following data types:
    Tuple, Map, List of Primitives, Map of Primitives, JsonArray and JsonObj
    All collection method take a lambda as argument
    (k, v) => {} for map type collections
    (i, v) => {} for list type collections.
  • Each()
    ** iterate through the collection and apply the lambda function
let arr = [1, 2, 3]
arr.Each((i, v) => {
  printf("index: %d:  value: %d", i, v)
})
  • Map()
    ** return a new list populated with the results of calling a provided function on every element in the calling collection.
  • Filter()
    ** return a new list, only keep the element that return true by the provided lambda
  • Some()
    ** return true if one of the element return true
  • Find()
    return the value element that return true by the provided lambda
    return undefined otherwise
let arr = [
  {ID: "a", Col1: "x"},
  {ID: "b", Col1: "y"}
]
let f = arr.Find((_,e) => e.Col1 == "x")
if (f) {
  printf("Find: %v", f.ID)
}
  • Table()
    ** create a table object.
let s = `
[ 
  {"ID": "a",  "Col1":"foo"  },
  {"ID": "b",  "Col2":"bar"  }
]`
let j = parseJson(s)
let t1 = j.Table( (_, obj) => {
   return {ID:obj.ID, Col1:obj.Col1, COl2:obj.Col2}
})
  • Join(delim)
    ** only apply to list type collections
let arr = [1, 2, 3]
let s = arr.Join(",") // s is now the string "1,2,3"

Time Methods

  • Format(layout)
  • Add(relativeTime)
  • Before(time)
  • After(time)
    ** return true or false
  • Round(duration)
    Round returns the result of rounding to the nearest multiple of dration. The rounding behavior for halfway values is to round up
    Valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h".
  • Unix()
    ** return epoch time in second
  • UnixMilli()
    ** return epoch time in millisecnod
let t = new Time()
printf("%s", t)           // 2023-08-17T23:41:37-04:00
let t2 = t.Add("-1h")
printf("%s", t2)         // 2023-08-17T22:41:37-04:00

printf("%d", t.Unix())   // 1692330097
printf("%d", t2.UnixMilli()) // 1692326497260

printf("%v", t2.Before(t))    // true
printf("%s", t2.Format("2006-01-02T15:04:05Z07:00"))
//  2023-08-17T22:41:37-04:00
printf("%s", t2.Round("1h")) // 2023-08-17T23:00:00-04:00

Table Methods

  • IsEmpty()
    ** return true if table is empty, false otherwise
if testTable.IsEmpty() {
  printf("Table is empty")
} else {
  // code block
}
  • RenameColumn(columnName, newColumnName)
    ** rename column "columnName" to "newColumnName"

  • RemoveColumn(columnName)
    ** remove columnName from the table

.testTable before removeColumn function calls

IDCityStateCountry
1RockvilleMarylandUS
2Silver SpringMarylandUS
3BaltimoreMarylandUS

testTable.RemoveColumn("State") // removes the State column from testTable
testTable.RemoveColumn("Country") // removes col2 from testTable

.testTable after removeColumn function calls

IDCity
1Rockville
2Silver Spring
3Baltimore

  • GetColumnValues(columnName)
    ** return a list of values on columnName from the table
IDCityStateCountry
1RockvilleMarylandUS
2Silver SpringMarylandUS
3BaltimoreMarylandUS

testTable.GetColumnValues("City") // returns ["Rockville", "Silver Spring", "Baltimore"]
  • GetKeys()
    ** return list of values from the key column: "ID" before the merge, "_globalID" after the merge.

1RockvilleMarylandUS
2Silver SpringMarylandUS
3BaltimoreMarylandUS

testTable.GetKeys() // return [1,2,3]
  • SetColumnUnit(column, unit)
    ** set the unit of column

IDItemCost
1Pen2.99
2Eraser5.99
3Ruler1.99
testTable.SetColumnUnit("Cost", "USD") // sets the Cost column to USD
  • Sort(limit, "+col1", "-col2"...)
    sort the table by column values and limit to the first N.
    limit = 0 will return all results.
    ** "+" for ascending and "-" for descending, if not specified then defaults to descending order

.testTable before Sort


IDItemCost
1Pen2.99
2Eraser5.99
3Ruler1.99

// return top 10 rows, sort by "Cost" column in descending order
natGateways.Sort(10, "-Cost")
// sort testTable by the "Cost" column in descending order
testTable.Sort(0, "Cost")

.testTable after the Sort function

IDItemCost
2Eraser5.99
1Pen2.99
3Ruler1.99

  • Join(rightTable, ({keyColumn1, keyColumn2...}, {OtherColumns...}) => joinType )
    this method will update the calling table (left table)
    the join configuration is specified as a lambda function
    joinType is one of the following: "inner", "fullouter"
    join rightTable on keyColumn(s). if "OtherColumns" are not provided, all columns from the rightTable will be joined.
    {ID}: Both left column and right column is named "ID"
    {ID2:ID}: Right table column "ID2" is renamed to "ID" in left table
bucketTable.Join(byteSummary, {ID}, {Total_Bytes, Total_Cost})

//
let arr1 = [
  {ID: "a", Col1: 3},
  {ID: "b", Col1: 2}
]
let t1 = jsonTable(arr1)

let arr2 = [
  {ID2: "a", Col2: "foo"},
  {ID2: "c", Col2: "bar"}
]
let t2 = jsonTable(arr2)

//t1.Join(t2, ({ID}) => "inner") 
//t1.Join(t2, ({ID2:ID}, {Col2:Col21}) => "inner") 
t1.Join(t2, ({ID2:ID}, {Col2:Col21}) => "fullouter") 

return {t1}
  • Append(table1, table2 ...)
    ** merged the input tables into the calling table
  • Map( (row) => x )
    ** return a new list populated with the results of calling a provided function on every row in the calling table
  • Filter( (row) => predicate(row) )
    ** remove rows where predicate function return false
  • Each( (row) => { })
    ** for each row, run the lambda function
let arr1 = [
  {ID: "a", Col1: 3},
  {ID: "b", Col1: 2}
]

let t1 = jsonTable(arr1)

let arr2 = [
  {ID2: "a", Col2: "foo"},
  {ID2: "c", Col2: "bar"}
]

let t2 = jsonTable(arr2)

let keyMap = {}
t2.Each( ({ID2})=> {
  keyMap[ID2]=true
})

t1.Filter( ({ID}) => !keyMap[ID])

return {t1} 
  • Aggregate(({col1, col2 ... }) => { groupBy:{groupByKey1,...}, columns:{ aggregates }}
    this method will generate a new table
    the input is a lambda function.
    the input object destructuring pick up the columns in the table
    the return is a object.
    the "groupBy" object is optional, specify the groupBy column(s).
    the "columns" object specify the aggreated columns
    sum is the aggregation function
    Total is the column name.
    sum:{Total: col2} define a new column "Total" which is the sum of the column "col2". this is equivalent to "sum(col2) as Total" in SQL
    sum:{col2} is equivalent to sum:{col2:col2}
    for "count" aggregate, a bool expression is expected.
    count: {Count:true}
    ** available aggregate functions: count, sum, avg, min, max, values, dcount, coalesce, first, argmin, argmax
   let customerTable = bucketTable.Aggregate(({Customer, S3_Cost}) =>  {
      return { groupBy:{Customer}, columns:{sum:{S3_Cost}} }
   })

** argmax and argmin

let arr = [
  {ID: "a", Col1: "1900", Col2: "abc"},
  {ID: "a", Col1: "1921", Col2: "mno"}
]

let t = jsonTable(arr)
let tg = t.Aggregate( ({ID, Col1, Col2}) => {
    return { groupBy:{ID}, columns:{ argmax:{Col1, Col2} }}
})

return {tg}
  • ColumnAggregate(columnName, unit, (ID, columnName, value, sum) => { }, 0)
    ** return a new table the same ID column, plus one new column which is the aggregated result from the calling table.
// simple sum of all columns from the table named "bucketTable"
// new table "byteSummary" has the same ID column plus one "Total_Bytes" column
let byteSummary = bucketTable.ColumnAggregate("Total_Bytes", "Byte", (ID, col, value, sum) => {
      return sum + value
},0)

// more complicate example, the lambda calls AWSPrice API to get the monthly cost of different S3 storage types.
// the table "bucketTable" is derived from the "DimensionTable" method of a metric stream, each storage type has one column
let costSummary = bucketTable.ColumnAggregate("Total_Monthly_Cost", "Dollar", (ID, col, value, sum) => {
      return sum + AWSPrice("S3", "StorageType", {Size:value, Type:col})
}, 0)
  • NewColumnLambda(columnName, unit, (row) => { })
    ** Generate one new column on the calling table. The column value is the return value of the lambda function.
// create a new column "AverageSize"  on table "bucketTable".  The new column will read the two column named "Total_Bytes" and "Total_Object_count" respectively and calculate the the average as column value.  
bucketTable.NewColumnLambda("AverageSize", "Byte", (row) => row.Total_Bytes / row.Total_Object_Count)
// OR 
bucketTable.NewColumnLambda("AverageSize", "Byte", ({Total_Bytes, Total_Object_Count}) => Total_Bytes / Total_Object_Count)
  • NewColumns( (row) => {})
    Generate new columns on the calling table.
    the lambda function will return a object holding the new columns
threats.NewColumns( ( {sha256} ) => {
   let devices = Cylance_GetThreatDevices(sha256)
   let filePaths = []
   let deviceNames = []
   for (let i = 0; i < len(devices); i++) {
       let device = devices[i]
       filePaths = append(filePaths, device.file_path) 
       deviceNames = append(filePaths, device.name)
   } 
   return {filePaths, deviceNames}
})
  • Clone("Col1", "Col2" ...)
    Generate a new table which is a copy of the calling table, with only the specified columns
    if no columns are specified, all columns will be copied

  • Clone()
    ** Generate a new table which is a copy of the calling table

let t2 = t1.Clone() // t2 is a copy of t1 
  • GetRow(columnMap)
    ** return row with the specified column values
let arr = [
  {col1: "foo", col2: "bar", col3: "abc"}
]
let t = jsonTable(arr)
let row = t.GetRow({col1:"foo", col2:"bar"}) 
// row = {col1: "foo", col2: "bar", col3: "abc"}
// else row = undefined

  • JoinStream(stream, aggregationType, columnName, unit)
    ** Generate one new column on the calling table. The column value is the aggregated result of each data series.
    [source,javascript]
function getNatBandwidth(assetTable) {
  let options = {from: "-24h@h", to: "@h", dimensions: "NatGatewayId", namespace: "AWS/NATGateway", period: "1h", unit:"Byte", stat: "Sum"}
  let filters = {NatGatewayId: assetTable}
  let download = AWS_GetMetric("BytesInFromDestination", options, filters)
  let upload = AWS_GetMetric("BytesOutToDestination", options, filters)
  let localUpload = AWS_GetMetric("BytesInFromSource", options, filters)
  let localDownload = AWS_GetMetric("BytesOutToSource", options, filters)
  let totalBytes = download + upload + localUpload + localDownload
  let processCost = AWS_GetPrice("NatGateway", "GB")
  let hourlyCost =  AWS_GetPrice("NatGateway", "Hour")
  let cost = (hourlyCost * 3600 / totalBytes.GetInterval()) +  totalBytes * processCost / (1024 * 1024 * 1024)
  return {download, upload, totalBytes, cost}
}

function main() {
  return AWS_AccountRegionLambda("*", "*", (account, region) => {
    let natGateways = AWS_LoadAsset("ec2:natgateway", (obj) => {
       let {NatGatewayId:ID, State, VpcId} = obj
       let PublicIp = obj.NatGatewayAddresses[0].PublicIp
       return {ID, State, VpcId, PublicIp}
    })

    let {totalBytes} = getNatBandwidth(natGateways)
    natGateways.JoinStream(totalBytes,"Sum", "TotalBytes", "Byte")
    return {natGateways}
  })
}

Metric Stream Methods

  • IsEmpty()
    ** return true if the stream has no data series
cpu.IsEmpty()
  • Sort(limit, "AggregationType1", "AggregationType2"...)
    ** sort the stream by aggregation(s)
// top 10 CPU utilizations
cpu.Sort(10, "Average")
  • ReplaceKey(keyValueMap)
    ** replace metric key based on a key/value map
  // replace compoment metric key (compoment.id) with compoment name
  let idMap = {}
  let components = Platform_LoadComponent()
  components.Each( (_, c) => {
    idMap[c.id] = c.name
  })

  let {table:sources, histogram: sourceHistogram} = groupByBytes(from, to, "platform_component_bytes", `component="datasource"`,  "id", 10)

  sources.NewColumnLambda("name", "", (row) => idMap[row.id])
  sourceHistogram.ReplaceKey(idMap)
  • SummaryTable(column, unit, aggregationType)
    create a new table with a new column which holds the aggreation results for each data series
    aggregationType: Sum|Average|Min|Max|Count|Last
// create a new table "invocationSummary" with a column "Total_Invocations"
let invocationSummary = lambdaInvocations.SummaryTable("Total_Invocations", "Count", "Sum")
  • TimeTable(timeFormat, unit)
    creates a new table where each column is a time slot rendered with timeFormat.
    timeFormat follows the Golang Time Format
let timeTable = balance.TimeTable("Jan 02 15:04:05", "Percent")
  • DimensionTable(dimension, unit, aggregationType)
    ** for metric stream with two dimensions. choose one dimension as the key dimension. the value of the other dimension will become a new column in the created new table
function getS3BucketSize(assetTable) {
 let options = {from:"-48h@d", to:"@d", dimensions=["BucketName","StorageType"], namespace:"AWS/S3", period:"24h", stat:"Average"}
 let filters = {BucketName:assetTable}
 let size = AWS_GetMetric("BucketSizeBytes", options, filters)
 let objCount = AWS_GetMetric("NumberOfObjects", options, filters)
 return {size, objCount}
}

function main() {
  return AWS_AccountRegionLambda("*", "*", (account, region) => {
    let buckets = AWS_LoadAsset("s3:bucket", (obj) => { return {ID: obj.Name} })
    let {size, objCount} = getS3BucketSize(buckets)
    let bucketTable = size.DimensionTable("StorageType","Byte","Last")
    let countTable = objCount.DimensionTable("StorageType","Count","Last") 
    return {bucketTable, countTable}
  })
}
  • SetTags(assetTable)
    ** convert asset table columns into tags for the metric stream key
  • SetUnit(unit)
    ** set unit for metric stream

FplAlert Methods

  • Limit(n)
    ** keep the topN anomalies

  • Emit(name, description, severity, OffDelay)
    severity: error | warn | info
    OffDelay: alert will be cleared after OffDelay seconds. -1 means never expires

Bulit-in Resource Loading Support

  • AWS_Cli_List(<cmd_line>, (obj) => { })
    ** list AWS assets via AWS cli

  • AWS_Cli_Get(<cmd_line>, idList, (id, obj) => {})
    ** get asset attributes from a list of ID

  • NOTE the AWS_Cli_List and AWS_Cli_Get are not open for production deployment. For security concerns. If the role IAM policy is not properly configured, it may cause security issues.

let natGateways = AWS_Cli_List("ec2 describe-nat-gateways", (obj) => {
      let ID = obj.NatGatewayId
      let State = obj.State
      let VpcId = obj.VpcId
      let PublicIp = obj.NatGatewayAddresses[0].PublicIp
      return {ID, State, VpcId, PublicIp}
})
// call AWS cli:  "aws ec2 describe-nat-gateways"
// same as AWS_LoadAsset( "ec2:natgateway", ...
function main() {
   return AWS_AccountRegionLambda("*", "us-west-2", () => {
      let queues = AWS_Cli_List("sqs list-queues", (url) => {
         let QueueUrl = url
         let segments = split(QueueUrl, "/")
         let ID = segments[len(segments)-1]
         let fifo = endsWith(ID, ".fifo")
         return { ID, QueueUrl, fifo }
      })

      let queueTags = AWS_Cli_Get("sqs list-queue-tags --queue-url", queues.GetColumnValues("QueueUrl"), (id, obj) => {
           let QueueUrl = id
           let TagCount = len(obj.Tags)
           return {QueueUrl, TagCount}
      })

      let queueAttributes = AWS_Cli_Get("sqs get-queue-attributes --attribute-names All  --queue-url", queues.GetColumnValues("QueueUrl"), (id, obj) => {
           let QueueUrl = id
           let QueueArn = obj.Attributes.QueueArn
           return {QueueUrl, QueueArn}
      })
      queues.Join(queueTags, {QueueUrl:"QueueUrl"})
      queues.Join(queueAttributes, {QueueUrl:"QueueUrl"})
      return {queues}
   })
}
  • AWS_LoadAsset(, (obj) => { })

loads an AWS resource and convert them into a table
resource:
lambda:function
ec2:vpc
ec2:instance
ec2:volume
s3:bucket
ec2:natgateway
eks:cluster
eks:nodegroup
sqs:queue
elasticloadbalancing:loadbalancer
elasticloadbalancing:targetgroup
apigateway:apis
Must have an ID variable
if the lambda function return null, the entry will be skipped (filterMap function)
extracting values from tags
*
jsonGetTag(obj, , , , )
*** jsonGetAWSTag(obj, )

// AWS_LoadAsset example
function main() {
    return AWS_AccountRegionLambda("FluencySIEM", "us-east-1", () => {
        let clusters = AWS_LoadAsset("eks:cluster", (obj) => {
            let {Name:ID, Status, CreatedAt} = obj        
            return {ID, Status, CreatedAt}
        })
        return {clusters}
    })
}

/*
  "Tags": [
    {
      "Key": "Name",
      "Value": "my-instance"
    }
  ],
*/
// Suppose the JSON tag, Value can be extracted through
let Name = jsonGetTag(obj, "Tags", "Key", "Name", "Value")
let Name = jsonGetAWSTag(obj, "Name") 

.clusters Table
|===
|ID |Status | CreatedAt |_account |_region

|my-cluster
|ACTIVE
|2023-07-04T18:16:35.35Z
|FluencySIEM
|us-east-1
|===

  • AWS_LoadAsset with aggregate/groupBy
    the return object. { aggregate: { groupBy:{groupByKey1,...}, columns:{ aggregates }}}
    the groupBy and columnss use the same format as table.Aggregate()
return AWS_AccountRegionLambda("*","*", () => {
      let volumes = AWS_LoadAsset("ec2:volume",({VolumeType, State, Iops, Size}) => {
          return {aggregate:{ groupBy:{VolumeType}, columns: {Sum:{Size}}}}
      })             
      volumes.SetColumnUnit("Size", "GB")
      return {volumes}
})

Built-in Metric Loading Support

  • AWS_GetMetric(metricName, options, filters) // load AWS metrics
    options: {from, to, dimensions, namespace, period, stat, unit, timezone}
    options.dimensions could be one string or a list of strings
    ** filters: {dimensionName: assetTable}
function getLambdaCost(assetTable) {
  let options = {from: "-60m@m", to: "@m", dimensions: "FunctionName", namespace: "AWS/Lambda", period: "5m", stat: "Sum"}
  let filters = {FunctionName:assetTable}
  options.unit = "Millisecond"
  let duration = AWS_GetMetric("Duration", options, filters)
  options.unit = "Count"
  let invocation = AWS_GetMetric("Invocations", options, filters)
  return {duration, invocation}
}

Built-in AWS Pricing API

  • AWS_GetPrice(service, resource, options)
    service: "Lambda", resource: "GB-Second" , "Request"
    service: "S3", resource: "StorageType"
    service: "NatGateway", resource "GB" , "Hour"
    service: "ApplicationLoadBalancer", resource "Hour", "LCU-Hour"

  • AWS_GetCostUsage(options)
    from: report start time
    to: report end time
    metric: AmortizedCost | BlendedCost | UnblendedCost | UsageQuantity
    granularity: DAILY | HOURLY
    dimensions: AZ, INSTANCE_TYPE, LEGAL_ENTITY_NAME, INVOICING_ENTITY, LINKED_ACCOUNT, OPERATION, PLATFORM, PURCHASE_TYPE, SERVICE, TENANCY, RECORD_TYPE, and USAGE_TYPE
    tags: customer defined cost allocation tags

function main() {
 return AWS_AccountLambda("Production", () => {
    let dailyUsage=AWS_GetCostUsage({from:"-60d@d", to:"-1d@d", metric:"UsageQuantity", granularity:"DAILY"})
    let dailyBlended=AWS_GetCostUsage({from:"-30d@d", to:"-1d@d", metric:"BlendedCost", granularity:"DAILY"})
    let dailyUnBlended=AWS_GetCostUsage({from:"-30d@d", to:"-1d@d", metric:"UnblendedCost", granularity:"DAILY"})
    let dailyAmortized=AWS_GetCostUsage({from:"-60d@d", to:"-1d@d", metric:"AmortizedCost", granularity:"DAILY"})
    let dailyCostByService=AWS_GetCostUsage({from:"-30d@d", to:"-1d@d", metric:"AmortizedCost", granularity:"DAILY", dimensions:"SERVICE"})
    dailyCostByService.Sort(10)
    return {dailyUsage, dailyBlended, dailyUnBlended, dailyAmortized, dailyCostByService}
 })
} 

Comparison with SQL and Splunk Processing Language

  • Language Design
    SQL/SPL are all "script". No if/else. Difficult to learn for programers.
    FPLv2: javascript es6 grammar. Real programming language with if/else statement, for loop and exception support.
  • Data Source
    SQL: relational database
    SPL: data lake
    FPLv2: data lake, any document based database, key-value store, time series database (TSDB). Support both json document store and metric data stream.
    FPLv2: support data source based on cloud API, such as cloudwatch getmetric api, AWS management "describe" and "list_" APIs.
  • Throughput and Efficiency
    ** FPLv2: Native execution in Golang. Built-in support for parallel multi-account, multi-region data queries.
  • Report/Alert
    ** FPLv2: Fully automated anomaly detection. Support table/chart/alert rendering.
  • Data streaming support
    SQL/SPL: n/a
    FPLv2: support streaming mode, parse/normalize streaming data

Code comparison:

  • SELECT
// SQL
SELECT col1, col2 from table1 where col3="hello"

// FPLv2
Load("remoteAsset", ({col1, col2, col3}) => { 
                       if col3=="hello" {
                          return {col1, col2}
                       }
                       return null
                    }) 
   
  • Aggregate/GROUPBY
// SQL
SELECT col1, sum(col2) from table 
WHERE col3="hello"
GROUP BY col4

// FPLv2
Load("remoteAsset", ({col1, col2, col3, col4}) => { 
                       if col3=="hello" {
                          return { aggregate: {groupBy:{col4}, columns:{sum:{col2}}}}
                       }
                       return null
                    }) 

  • JOIN
// SQL
SELECT * from table1
INNER JOIN tabl2
ON table1.col1=table2.col2

// FPLv2
table1.Join(table2, ({ID2:ID1}, {col21, col22 ...}) => "inner" )
  • Sort
// SQL
SELECT * from table1
ORDER BY col1 desc
Limit 10
// FPLv2
table1.Sort(10, "-col1")
// OR method chaining
Load("remoteAsset", ( { col1, col2, col3, col4}) => { 
                       if col3=="hello" {
                          return { aggregate: {groupBy:{col4}, columns:{sum:{Total:col2}}}}
                       }
                       return null
                    }).Sort(10, "-Total")