Tools & Queries
Rockfish MCP provides SQL-based tools for querying Parquet data.
Available Tools
| Tool | Description |
|---|---|
list_sources | List configured data sources |
schema | Get column names and types |
query | Query with filters and column selection |
aggregate | Group and aggregate data |
sample | Get random sample rows |
count | Count rows with optional filter |
list_sources
List all configured data sources.
list_sources: {}
Response:
{
"sources": [
{"name": "flow", "description": "Network flow data"},
{"name": "ip_reputation", "description": "IP reputation scores"}
]
}
schema
Get column names and types for a data source.
schema:
source: flow
format: table
Parameters:
| Name | Required | Description |
|---|---|---|
source | Yes | Data source name |
format | No | Output format (default: table) |
query
Query with filtering, column selection, and custom SQL.
Basic Query
query:
source: flow
columns: [saddr, daddr, sbytes, dbytes]
filter: "sbytes > 1000000"
limit: 50
format: json
Parameters:
| Name | Required | Description |
|---|---|---|
source | Yes | Data source name |
columns | No | Columns to select (default: all) |
filter | No | WHERE clause condition |
order_by | No | ORDER BY clause |
limit | No | Maximum rows |
format | No | Output format |
Custom SQL
Use {source} placeholder for the data source:
query:
source: flow
sql: |
SELECT saddr, COUNT(*) as connection_count, SUM(sbytes) as total_bytes
FROM {source}
GROUP BY saddr
ORDER BY total_bytes DESC
LIMIT 10
Time-based Queries
query:
source: flow
filter: "stime >= '2025-01-01' AND stime < '2025-01-02'"
columns: [stime, saddr, daddr, proto]
Protocol Filtering
query:
source: flow
filter: "proto = 'TCP' AND dport = 443"
columns: [saddr, daddr, ndpi_appid]
aggregate
Group and aggregate data.
aggregate:
source: flow
group_by: [dport]
aggregations:
- function: sum
column: sbytes
alias: total_bytes
- function: count
alias: connection_count
filter: "proto = 'TCP'"
order_by: "total_bytes DESC"
limit: 20
format: table
Parameters:
| Name | Required | Description |
|---|---|---|
source | Yes | Data source name |
group_by | Yes | Columns to group by |
aggregations | Yes | Aggregation functions |
filter | No | WHERE clause |
order_by | No | ORDER BY clause |
limit | No | Maximum rows |
Aggregation Functions
| Function | Description |
|---|---|
count | Count rows |
sum | Sum values |
avg | Average |
min | Minimum |
max | Maximum |
count_distinct | Count unique values |
Examples
Top destination ports by traffic:
aggregate:
source: flow
group_by: [dport]
aggregations:
- function: sum
column: sbytes + dbytes
alias: total_bytes
- function: count
alias: flows
order_by: "total_bytes DESC"
limit: 10
Flows by country (requires GeoIP):
aggregate:
source: flow
group_by: [scountry, dcountry]
aggregations:
- function: count
alias: flow_count
filter: "scountry IS NOT NULL"
sample
Get random sample rows.
sample:
source: flow
n: 10
format: json
Parameters:
| Name | Required | Description |
|---|---|---|
source | Yes | Data source name |
n | No | Number of rows (default: 10) |
format | No | Output format |
count
Count rows with optional filter.
count:
source: flow
filter: "ndpi_risk_score > 50"
Parameters:
| Name | Required | Description |
|---|---|---|
source | Yes | Data source name |
filter | No | WHERE clause |
Output Formats
| Format | Description |
|---|---|
json | Pretty-printed JSON array |
jsonl / json_lines / ndjson | Newline-delimited JSON |
csv | CSV with header |
table / text | ASCII table |
Common Query Patterns
Top Talkers
query:
source: flow
sql: |
SELECT saddr,
COUNT(*) as flows,
SUM(sbytes) as sent,
SUM(dbytes) as received
FROM {source}
GROUP BY saddr
ORDER BY sent + received DESC
LIMIT 20
DNS Traffic
query:
source: flow
filter: "dport = 53 OR sport = 53"
columns: [stime, saddr, daddr, sbytes, dbytes]
High-Risk Flows
query:
source: flow
filter: "ndpi_risk_score > 100"
columns: [stime, saddr, daddr, ndpi_appid, ndpi_risk_list]
Long-Duration Flows
query:
source: flow
filter: "dur > 3600000" # > 1 hour in ms
columns: [stime, etime, dur, saddr, daddr, sbytes, dbytes]
order_by: "dur DESC"
External Traffic
query:
source: flow
filter: "NOT (saddr LIKE '10.%' OR saddr LIKE '192.168.%')"
columns: [saddr, daddr, scountry, dcountry]
Application Distribution
aggregate:
source: flow
group_by: [ndpi_appid]
aggregations:
- function: count
alias: flows
- function: sum
column: sbytes + dbytes
alias: bytes
filter: "ndpi_appid IS NOT NULL"
order_by: "bytes DESC"
limit: 20