README
druid-query
Simple querying for Druid (http://druid.io) in Node.js. Inspired by ruby-druid.
Table of Contents:
Installation
npm install druid-query --save
Example (simple)
var Druid = require('druid-query')
, Client = Druid.Client
, Query = Druid.Query
, client = new Client('http://127.0.0.1:8080')
var q1 = client.groupBy()
q1.dataSource('randSeq')
q1.granularity('all')
q1
.dimensions([])
.aggregation('count', 'rows')
.aggregation('doubleSum', 'e', 'events')
.aggregation('doubleSum', 'randomNumberSum', 'outColumn')
.postAggregation('arithmetic', 'avg_random', '/', [
Query.postAggregation('fieldAccess', null, 'randomNumberSum')
Query.postAggregation('fieldAccess', null, 'rows')
])
.interval(Date.UTC(2012, 9, 1), Date.UTC(2020, 0, 1))
q1.exec(function(err, result) {
if (err) {
// handle error...
}
else {
beCoolWith(result)
}
})
var q2 = new Druid.TimeBoundaryQuery()
q2.dataSource('wikipedia')
client.exec(q2, function(err, result) {
// handle results
})
Example (ZooKeeper)
var Druid = require('druid-query')
, druid = new Druid('localhost:2181,localhost:2182/druid', '/broker', {preferSSL: true})
var query = druid.groupBy('myCoolDS')
query
.filter('selector', 'dimension1', 100500)
.dimensions('dimension2', 'dimension3')
.granularity('day')
.aggregation('count', 'howMany')
.interval(Date.UTC(2012, 0, 1), Date.UTC(2015, 0, 1))
.exec(function(err, result) {
// handle error
// handle result
})
var anotherQuery = new Druid.SegmentMetadataQuery()
anotherQuery.dataSource('superDS')
anotherQuery.interval('2011-01-01/2012-01-01')
anotherQuery.interval('2013-01-01/2014-01-01')
druid.exec(anotherQuery, function(err, results) {
if (err) {
// error reasons:
// 1. data source is not served by any known node
// 2. query validation error
// 3. error from Druid node after executing query
}
else {
// handle results
}
})
druid.once('ready', function() {
// Do what you want with this event :-)
})
druid.on('error', function(err) {
// handle client error here
})
// Call .end() when finished working with Druid
druid.end()
API
- Druid
- Events
- Druid(connectionString, discoveryPath, [options])
- #cancel(query, callback)
- #end()
- #exec(query, callback)
- #getDataSources()
- #getNodes()
- #groupBy(dataSource, [rawQuery])
- #search(dataSource, [rawQuery])
- #segmentMetadata(dataSource, [rawQuery])
- #timeBoundary(dataSource, [rawQuery])
- #timeseries(dataSource, [rawQuery])
- #topN(dataSource, [rawQuery])
- Client
- Query
- Field setters (Query methods)
- .aggregation(type, name, [args...])
- .aggregations(list...)
- .extractionFunction(type, [args...])
- .filter(type, [args...])
- .having(type, [args...])
- .interval(start, [end])
- .orderBy(dimension, [direction])
- .postAggregation(type, name, [args...])
- .postAggregations(list...)
- .query(type, value, caseSensitive)
- #aggregation(type, name, [args...])
- #aggregations(list...)
- #bound(value)
- #context(data)
- #dataSource(type, args...)
- #dimension(dimension, [outputName], [extractFn])
- #dimensions(list...)
- #filter(type, [args...])
- #granularity(type, [args...])
- #having(type, [args...])
- #interval(start, [end])
- #intervals(intervals...)
- #limitSpec(type, limit, orderByColumns)
- #merge(value)
- #metric(type, [args...])
- #postAggregation(type, name, [args...])
- #postAggregations(list...)
- #query([type], value...)
- #queryType(type)
- #searchDimensions(list...)
- #sort(type)
- #threshold(value)
- #toInclude(value)
Druid
Client which uses ZooKeeper to get data about Druid nodes and then gets data sources served by each node.
Events
ready
- emitted when client finished (re)loading of nodes data (so it's ready to use). If client occasionally looses connection to ZooKeeper it's re-establed and client loads node data again.error
- emitted when client receives any kind of error.
Druid(connectionString, discoveryPath, [options])
Create client instance.
Arguments
- connectionString
string
- ZooKeeper connection string. - discoveryPath
string
- Service discovery path. - options
object
- Client options.zookeeper
- Options passed tonode-zookeeper-client
createClient() function.preferSSL
- Use SSL port of Druid node if available. Default:false
.
void cancel(query, callback)
Cancel query. Works same way as #exec(query, callback).
Arguments
- query
Query
-Query
(or descendant class) instance. - callback
function
- Callback function with following signature:(err)
.
void end()
End working with client.
void exec(query, callback)
Run query
on suitable node.
If client is not ready
(read Events section above) method will wait for ready
or error
event to continue.
If query
data source is not among served by found nodes callback
will be called with corresponding error.
Once node with least number of concurrent running queries is choosed query
is sent to it.
Arguments
- query
Query
-Query
(or descendant class) instance. - callback
function
- Callback function with following signature:(err, result)
.
getDataSources() string[]
Get list of data sources.
getNodes() DruidNodes[]
Get list of Nodes available.
DruidNode
extends Druid.Client
. It keeps ZooKeeper node data and number of concurrent running queries (used for simple load-balancing).
groupBy(dataSource, [rawQuery]) GroupByQuery
search(dataSource, [rawQuery]) SearchQuery
segmentMetadata(dataSource, [rawQuery]) SegmentMetadataQuery
timeBoundary(dataSource, [rawQuery]) TimeBoundaryQuery
timeseries(dataSource, [rawQuery]) TimeseriesQuery
topN(dataSource, [rawQuery]) TopNQuery
Return query instance with dataSource
set. Query is attached to calling Druid
instance, so Druid#exec(query, callback) is called to execute query.
Arguments
- dataSource
string
- name of data source to createQuery
for. - rawQuery
object
- passed toQuery
constructor as second argument.
Client (Druid.Client)
Base client class which uses Druid node URL.
Client(url)
Create client instance.
Arguments
- url
string
- Druid node URL.
void fromZooKeeper(connectionString, discoveryPath, [options], callback) static
Lookup Druid services via ZooKeeper using node-zookeper-client and choose random node. For choosed node Client
instance is created.
Arguments
- connectionString
string
- ZooKeeper connection string. - discoveryPath
string
- service discovery path. - options
object
- Lookup options. We have only one option currently available:preferSSL
- Use SSL port of Druid node if available. Default:false
.
void cancel(query, callback)
Cancel query.
Arguments
- query
Query
- Query object. - callback(err)
function
- The callback function.
void dataSources(callback)
Get list of dataSources.
Arguments
- callback(err, dataSources)
function
- The callback function.
void exec(query, callback)
Execute query.
Arguments
- query
Query
- Query object. - callback(err, result)
function
- The callback function.
groupBy([rawQuery]) GroupByQuery
search([rawQuery]) SearchQuery
segmentMetadata([rawQuery]) SegmentMetadataQuery
timeBoundary([rawQuery]) TimeBoundaryQuery
timeseries([rawQuery]) TimeseriesQuery
topN([rawQuery]) TopNQuery
Create Query
instance and attach it to client.
Arguments
- rawQuery
object
- passed as second argument toQuery
constructor.
Query (Druid.Query)
Note: each field method returns field value if no arguments specified.
Query(client, [rawQuery])
Create query instance
Arguments
- client
Client
- Client instance. - rawQuery
object
- Raw query data (so you can callQuery#exec(callback)
orDruid#exec(query, callback)
right after creatingQuery
object. Keep in mind that if constructor is not baseQuery
class (e.g.GroupByQuery
)queryType
property is first removed fromrawQuery
object to prevent errors.
void cancel(callback)
Cancel query. context.queryId
should be set for this.
Arguments
- callback(err)
function
- The callback function.
void exec(callback)
Execute query (only if it's attached to client e.g. created by some client instance).
Arguments
- callback(err, result)
function
- The callback function.
toJSON() object
Returns query data.
staticobject
aggregation(type, name, [args...])
Create aggregation spec.
Arguments
- type
string | object
- Aggregation type:cardinality
,count
,doubleSum
,hyperUnique
,javascript
,longSum
,max
,min
. Also you can specify aggregation as object in this argument. - name
string
- Aggregation output name. - args
...*
- Aggregation-specific arguments.
Query.aggregation('cardinality', name, fieldNames, byRow)
- fieldNames
string[]
- Fields to compute cardinality over. - byRow
boolean
- If we should compute cardinality over distinct combinations. Default:false
.
Query.aggregation('count', name)
- No arguments here
Query.aggregation('doubleSum', name, fieldName)
- fieldName
string
- Name of the metric column to sum over.
Query.aggregation('hyperUnique', name, fieldName)
- fieldName
string
- Dimension name.
Query.aggregation('javascript', name, fieldNames, aggregateFn, combineFn, resetFn)
- fieldNames
string[]
- Names of fields which are passed to aggregate function. - aggregateFn
string | function
- Aggregation function. - combineFn
string | function
- Combines partials. - resetFn
string | function
- Initialization function.
Query.aggregation('longSum', name, fieldName)
- fieldName
string
- Name of the metric column to sum over.
Query.aggregation('max', name, fieldName)
- fieldName
string
- Name of the metric column.
Query.aggregation('min', name, fieldName)
- fieldName
string
- Name of the metric column.
staticobject[]
aggregations(list...)
Return array of aggregations.
Arguments
- list
object[] | object...
- Array of aggregation specs. Specs can be returned by Query.aggregation() or raw JavaScript objects.
staticobject
extractionFunction(type, [args...])
Create DimExtractionFn
spec.
Arguments
- type
string | object
- Spec type:javascript
,partial
,regex
,searchQuery
,time
- or DimExtractionFn spec object. - args
...*
- Function-specific arguments.
Query.extractionFunction('javascript', fn)
- fn
string | function
- JavaScript function.
Query.extractionFunction('partial', regex)
- regex
string | RegExp
- Regular expression to match.
Query.extractionFunction('regex', regex)
- regex
string | RegExp
- Regular expression to match.
Query.extractionFunction('searchQuery`, query...)
- query
object | ...*
- If one argument is specified we treat it asSearchQuerySpec
object. Otherwise Query.query() is called for all the passed arguments.
Query.extractionFunction('time', input, output)
- input
string
- Input time format. - output
string
- Output time format.
staticobject
filter(type, [args...])
Create filter spec.
Arguments
- type
string | object
- Filter type:and
,javascript
,not
,or
,regex
,selector
,search
,in
- or raw filter object. - args
...*
- Filter-specific arguments. Described below.
Query.filter('and', filters...)
- filters
object[] | ...object
- List of filters forAND
.
Query.filter('javascript', dimension, fn)
- dimension
string
- Dimension to which filter is applied. - fn
string | function
- Function to apply (should return boolean value).
Query.filter('not', filter...)
- filter
string | ...*
- If this argument is object we use it as filter spec. Otherwise all arguments are passed again to Query.filter().
Query.filter('or', filters...)
- filters
object[] | ...object
- List of filters forOR
.
Query.filter('regex', dimension, pattern)
- dimension
string
- Dimension to which filter is applied. - pattern
string
- Java regular expression.
Query.filter('selector', dimension, value)
- dimensions
string
- Dimension to which filter is applied. - value
*
- Value to match.
Query.filter('search', dimension, query)
- dimensions
string
- Dimension to which filter is applied. - query
*
-SearchQuerySpec
object
Query.filter('in', dimension, values)
- dimensions
string
- Dimension to which filter is applied. - values
object[]
- Values to match.
staticobject
having(type, [args...])
Create having
spec.
Arguments
- type
string | object
- HavingSpec object or type:and
,equalTo
,greaterThan
,lessThan
,not
,or
. - args
...*
- Arguments specific to spec type.
Query.having('and', specs...)
- specs
object[] | ...object
- List of specs forAND
operation.
Query.having('equalTo', aggregation, value)
- aggregation
string
- Aggregation name. - value
*
- Value to match.
Query.having('greaterThan', aggregation, value)
- aggregation
string
- Aggregation name. - value
*
- Value to compare.
Query.having('lessThan', aggregation, value)
- aggregation
string
- Aggregation name. - value
*
- Value to compare.
Query.having('not', spec...)
- spec
object | ...*
- If first argument is object we use it as filter spec. Otherwise all arguments are passed again to Query.having().
Query.having('or', specs...)
- specs
object[] | ...object
- List of specs forOR
operation.
staticobject
interval(start, [end])
Create interval string.
Of one argument specified it's treated as interval string.
Arguments
- start
string | number | Date
- Interval string or start time as timestamp, date string orDate
object. - end
string | number | Date
- End time.
staticobject
orderBy(dimension, [direction])
Create OrderBy spec.
Arguments
- dimension
string
- Dimension to sort by. - direction
string
- Sorting direction. Default:ASCENDING
.
staticobject
postAggregation(type, name, [args...])
Create post-aggregation spec.
Arguments
- type
string | object
- Post-aggregation type:arithmetic
,constant
,fieldAccess
,hyperUniqueCardinality
,javascript
. Or it can be ready-to-use post-aggregation object (no need in other arguments in this case, of course). - name
string
- Post-aggregation output name. - args
...*
- Post-aggregation specific arguments. Read about arguments below.
Query.postAggregation('arithmetic', name, op, fields)
- op
string
- Arithmetic operation: +, -, * or /. - fields
object[] | ...object
- List of Post-Aggregation specs: raw objects or Query.postAggregation() results.
Query.postAggregation('constant', name, value)
- value
*
- Constant value.
Query.postAggregation('fieldAccess', name, fieldName)
- fieldName
string
- Name of aggregator field. If not specifiedpostAggregation()
second argument (name
) is used asfieldName
instead.
Query.postAggregation('hyperUniqueCardinality', name, fieldName)
- fieldName
string
- Name of hyperUnique aggregator. If not specifiedpostAggregation()
second argument (name
) is used asfieldName
instead.
Query.postAggregation('javascript', name, fieldNames, fn)
- fieldNames
string[]
- List of aggregator names - passed as arguments to function. - fn
string | function
- Post-aggregator function.
staticobject[]
postAggregations(list...)
Return array of post-aggregation specs.
Arguments
- list
object[] | object...
- Array of aggregation specs. They can be ones returned by Query.postAggregation() or raw JavaScript objects.
staticobject
query(type, value...)
Create SearchQuery spec.
Arguments
- type
string | object
- SearchQuery type:insensitive_contains
,fragment
. Or ready SearchQuerySpec object. - value
string | string[] | ...string
- Value(s) to match. Iftype
isfragment
value (or all the values) is treated as array. If type isinsensitive_contains
value is used asstring
.
aggregation(type, name, [args...]) Query
Add aggregation spec to aggregations
.
Arguments
- type
string | object
- Aggregation type:cardinality
,count
,doubleSum
,hyperUnique
,javascript
,longSum
,max
,min
. Or aggregation spec as JS object. - name
string
- Aggregation output name. - args
...*
- Aggregation specific arguments. Read above about arguments in Query.aggregation() description.
aggregations(list...) Query
Set aggregations
field.
Arguments
- list
object[] | object...
- Array of aggregation specs. Specs can be returned by Query.aggregation() or raw JavaScript objects.
bound(value) Query
Set bound
field for TimeBoundaryQuery.
Arguments
- value
string
- Must be either"minTime"
or"maxTime"
. Otherwise error is thrown.
context(data) Query
Set context
field. Read more about it here.
Arguments
- data
object
timeout
number
priority
number
queryId
string
useCache
boolean
populateCache
boolean
bySegment
boolean
finalize
boolean
dataSource(type, args...) Query
Set dataSource
field
Arguments
- type
string | object
- Data source type. Or data source as string or as object (DataSource structure). - args
...*
- Arguments specific to each data source type.
Query#dataSource('table', name)
- name
string
- Name of data source.
Query#dataSource('query', subQuery)
- subQuery
object | Query
- Sub-query as Query instance or raw query object.
dimension(dimension, [outputName], [extractFn]) Query
Set DimensionSpec.
If first argument is an object, then just use it as DimensionSpec.
If not depending on arguments length creates default or extraction dimension spec.
If second or third argument is object ExtractionDimensionSpec is created.
In other cases DefaultDimensionSpec is created.
Arguments
- dimension
string | object
- Dimension to operate on. Or dimension definition as object. - outputName
string
- Dimension output name. - extractFn
object
- Extraction function spec created by Query.extractionFunction() or raw JavaScript object.
dimensions(list...) Query
Set dimensions.
Arguments
- list
string[] | ...string
- Dimensions list.
filter(type, [args...]) Query
Set filter spec.
Arguments
- type
string | object
- Filter type:and
,javascript
,not
,or
,regex
,selector
. Otherwise whole filter object can be specified as first argument. - args
...*
- Filter-specific arguments. They are described in Query.filter() method description.
granularity(type, [args...]) Query
Set granularity of query.
Arguments
- value
string | object
- Granularity as string or object. Ifvalue
is string it must be one of those:all
,none
,minute
,fifteen_minute
,thirty_minute
,hour
,day
plusduration
andperiod
which mean granularity spec object is created. - args
...*
- Specific arguments (in case ifvalue
isperiod
orduration
).
Query#granularity('duration', duration, [origin])
- duration
string | number
- Duration value in ms. - origin
string | number | Date
- Start time (optional).
Query#granularity('period', period, [timeZone], [origin])
- period
string
- ISO-8601 duration format. - timeZone
string
- Timezone. Default: UTC (optional). - origin
string | number | Date
- Start time (optional).
having(type, [args...]) Query
Set having
field.
Arguments
- type
string | object
- HavingSpec object or type:and
,equalTo
,greaterThan
,lessThan
,not
,or
. - args
...*
- Arguments specific to spec type. They are described in Query.having().
interval(start, [end]) Query
Add interval string to intervals
field.
Arguments
- start
number | string | Date
- Start time or interval string. - end
number | string | Date
- End time.
intervals(intervals...) Query
Set intervals.
Arguments
- list
string[] | ...string
- List of interval strings.
limitSpec(type, limit, orderByColumns) Query
Set LimitSpec field.
Arguments
- type
string | object
- raw LimitSpec object or LimitSpec type. - limit
number
- Limit of records returned. - orderByColumns
object[] | string[]
- OrderBy specs array. Specs can be strings or results of Query.orderBy()
merge(value) Query
Set merge
field value.
Arguments
- value
boolean
- Merge all individual segment metadata results into a single result.
metric(type, [args...]) Query
Set TopNMetricSpec
identified by metric
value.
Arguments
- type
string | object
-TopNMetricSpec
object or spec type:alphaNumeric
,lexicographic
,numeric
. - args
...*
- Arguments specific to spec type. They are described below.
Query#metric('alphaNumeric', [previousStop])
- previousStop
string
- The starting point of the lexicographic sort (optional).
Query#metric('lexicographic', [previousStop])
- previousStop
string
- The starting point of the alpha-numeric sort (optional).
Query#metric('numeric', metric)
- metric
string
- The actual metric field in which results will be sorted by.
postAggregation(type, name, [args...]) Query
Add post-aggregation spec to postAggregations
array.
Arguments
- type
string | object
- Post-aggregation type:arithmetic
,constant
,fieldAccess
,hyperUniqueCardinality
,javascript
. It can be post-aggregation object itself. - name
string
- Post-aggregation output name. - args
...*
- Post-aggregation specific arguments. Read above about arguments in Query.postAggregation() method description.
postAggregations(list...) Query
Set postAggregations
field.
Arguments
- list
object[] | object...
- Array of aggregation specs. They can be ones returned by Query.postAggregation() or raw JavaScript objects.
query([type], value, caseSensitive) Query
Set SearchQuery spec (query
field).
Arguments
- type
string | object
- SearchQuery type:insensitive_contains
,fragment
,contains
. Or it can beSearchQuerySpec
object. - value
string | string[]
- Value(s) to match. Iftype
isfragment
value is treated as array. If type isinsensitive_contains
value is used asstring
. - caseSensitive
boolean
- Whether strings should be compared as case sensitive or not. Has no effect for typeinsensitive_contains
.
queryType(type) Query
Set type of query. This method should be used only if you're using Query
base class. All the Query
descendants have queryType
field set automatically.
Arguments
- type
string
- Valid query type:groupBy
,search
,segmentMetadata
,timeBoundary
,timeseries
,topN
.
searchDimensions(list...) Query
Set searchDimensions
field.
Arguments
- list
string[] | ...string
- Dimensions list.
sort(type) Query
Set sort
field.
Arguments
- type
string
- Sorting type:lexicographic
orstrlen
.
threshold(value) Query
Set threshold
value.
Arguments
- value
number
- Threshold number value.
toInclude(value) Query
Set toInclude
field - columns which should be returned in result.
Arguments
- value
string | string[] | object
-all
,none
or array of column names (list) ortoInclude
raw spec data as object.
Queries
GroupBy (Druid.GroupByQuery)
http://druid.io/docs/0.6.121/GroupByQuery.html
client
.groupBy()
.dataSource('sample_datasource')
.granularity('day')
.dimensions('dim1', 'dim2')
.limitSpec('default', 5000, ['dim1', 'metric1'])
.filter('and', [
Query.filter('selector', 'sample_dimension1', 'sample_value1'),
Query.filter('or', [
Query.filter('selector', 'sample_dimension2', 'sample_value2'),
Query.filter('selector', 'sample_dimension3', 'sample_value3')
])
])
.aggregation('longSum', 'sample_name1', 'sample_fieldName1')
.aggregation('doubleSum', 'sample_name2', 'sample_fieldName2')
.postAggregation('arithmetic', 'sample_divide', '/', [
Query.postAggregation('fieldAccess', 'sample_name1', 'sample_fieldName1'),
Query.postAggregation('fieldAccess', 'sample_name2', 'sample_fieldName2')
])
.intervals(new Date('2012-01-01T00:00:00.00'), new Date('2012-01-03T00:00:00.000'))
.having('greaterThan', 'sample_name1', 0)
.exec(/* result callback */)
Search (Druid.SearchQuery)
http://druid.io/docs/0.6.121/SearchQuery.html
client
.search()
.dataSource('sample_datasource')
.granularity('day')
.searchDimensions('dim1', 'dim2')
.query('insensitive_contains', 'Ke')
.sort('lexicographic')
.intervals(new Date('2013-01-01T00:00:00.000'), new Date('2013-01-03T00:00:00.000'))
.exec(/* result callback */)
Segment Metadata (Druid.SegmentMetadataQuery)
http://druid.io/docs/0.6.121/SegmentMetadataQuery.html
client
.segmentMetadata()
.dataSource('sample_datasource')
.intervals(new Date('2013-01-01'), new Date('2014-01-01'))
.exec(/* result callback */)
Time Boundary (Druid.TimeBoundaryQuery)
http://druid.io/docs/0.6.121/TimeBoundaryQuery.html
client
.timeBoundary()
.dataSource('sample_datasource')
.exec(/* result callback */)
Timeseries (Druid.TimeseriesQuery)
http://druid.io/docs/0.6.121/TimeseriesQuery.html
client
.timeseries()
.dataSource('sample_datasource')
.granularity('day')
.filter('and', [
Query.filter('selector', 'sample_dimension1', 'sample_value1'),
Query.filter('or', [
Query.filter('selector', 'sample_dimension2', 'sample_value2'),
Query.filter('selector', 'sample_dimension3', 'sample_value3')
])
])
.aggregation('longSum', 'sample_name1', 'sample_fieldName1')
.aggregation('doubleSum', 'sample_name2', 'sample_fieldName2')
.postAggregation('arithmetic', 'sample_divide', '/', [
Query.postAggregation('fieldAccess', 'sample_name1', 'sample_fieldName1'),
Query.postAggregation('fieldAccess', 'sample_name2', 'sample_fieldName2')
])
.intervals(new Date('2013-01-01T00:00:00.000'), new Date('2013-01-03T00:00:00.000'))
.exec(/* result callback */)
TopN (Druid.TopNQuery)
http://druid.io/docs/0.6.121/TopNQuery.html
client
.topN()
.dataSource('sample_data')
.dimension('sample_dim')
.threshold(5)
.metric('count')
.granularity('all')
.filter('and', [
Query.filter('selector', 'dim1', 'some_value'),
Query.filter('selector', 'dim2', 'some_other_val')
])
.aggregation('longSum', 'count', 'count')
.aggregation('doubleSum', 'some_metric', 'some_metric')
.postAggregation('arithmetic', 'sample_divide', '/', [
Query.postAggregation('fieldAccess', 'some_metric', 'some_metric'),
Query.postAggregation('fieldAccess', 'count', 'count')
])
.intervals(new Date('2013-08-31T00:00:00.000'), new Date('2013-09-03T00:00:00.000'))
.exec(/* result callback */)
TODO
- More tests
LICENSE
MIT