Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Tools & Queries

Rockfish MCP provides SQL-based tools for querying Parquet data.

Available Tools

ToolDescription
list_sourcesList configured data sources
schemaGet column names and types
queryQuery with filters and column selection
aggregateGroup and aggregate data
sampleGet random sample rows
countCount rows with optional filter

list_sources

List all configured data sources.

list_sources: {}

Response:

{
  "sources": [
    {"name": "flow", "description": "Network flow data"},
    {"name": "ip_reputation", "description": "IP reputation scores"}
  ]
}

schema

Get column names and types for a data source.

schema:
  source: flow
  format: table

Parameters:

NameRequiredDescription
sourceYesData source name
formatNoOutput format (default: table)

query

Query with filtering, column selection, and custom SQL.

Basic Query

query:
  source: flow
  columns: [saddr, daddr, sbytes, dbytes]
  filter: "sbytes > 1000000"
  limit: 50
  format: json

Parameters:

NameRequiredDescription
sourceYesData source name
columnsNoColumns to select (default: all)
filterNoWHERE clause condition
order_byNoORDER BY clause
limitNoMaximum rows
formatNoOutput format

Custom SQL

Use {source} placeholder for the data source:

query:
  source: flow
  sql: |
    SELECT saddr, COUNT(*) as connection_count, SUM(sbytes) as total_bytes
    FROM {source}
    GROUP BY saddr
    ORDER BY total_bytes DESC
    LIMIT 10

Time-based Queries

query:
  source: flow
  filter: "stime >= '2025-01-01' AND stime < '2025-01-02'"
  columns: [stime, saddr, daddr, proto]

Protocol Filtering

query:
  source: flow
  filter: "proto = 'TCP' AND dport = 443"
  columns: [saddr, daddr, ndpi_appid]

aggregate

Group and aggregate data.

aggregate:
  source: flow
  group_by: [dport]
  aggregations:
    - function: sum
      column: sbytes
      alias: total_bytes
    - function: count
      alias: connection_count
  filter: "proto = 'TCP'"
  order_by: "total_bytes DESC"
  limit: 20
  format: table

Parameters:

NameRequiredDescription
sourceYesData source name
group_byYesColumns to group by
aggregationsYesAggregation functions
filterNoWHERE clause
order_byNoORDER BY clause
limitNoMaximum rows

Aggregation Functions

FunctionDescription
countCount rows
sumSum values
avgAverage
minMinimum
maxMaximum
count_distinctCount unique values

Examples

Top destination ports by traffic:

aggregate:
  source: flow
  group_by: [dport]
  aggregations:
    - function: sum
      column: sbytes + dbytes
      alias: total_bytes
    - function: count
      alias: flows
  order_by: "total_bytes DESC"
  limit: 10

Flows by country (requires GeoIP):

aggregate:
  source: flow
  group_by: [scountry, dcountry]
  aggregations:
    - function: count
      alias: flow_count
  filter: "scountry IS NOT NULL"

sample

Get random sample rows.

sample:
  source: flow
  n: 10
  format: json

Parameters:

NameRequiredDescription
sourceYesData source name
nNoNumber of rows (default: 10)
formatNoOutput format

count

Count rows with optional filter.

count:
  source: flow
  filter: "ndpi_risk_score > 50"

Parameters:

NameRequiredDescription
sourceYesData source name
filterNoWHERE clause

Output Formats

FormatDescription
jsonPretty-printed JSON array
jsonl / json_lines / ndjsonNewline-delimited JSON
csvCSV with header
table / textASCII table

Common Query Patterns

Top Talkers

query:
  source: flow
  sql: |
    SELECT saddr,
           COUNT(*) as flows,
           SUM(sbytes) as sent,
           SUM(dbytes) as received
    FROM {source}
    GROUP BY saddr
    ORDER BY sent + received DESC
    LIMIT 20

DNS Traffic

query:
  source: flow
  filter: "dport = 53 OR sport = 53"
  columns: [stime, saddr, daddr, sbytes, dbytes]

High-Risk Flows

query:
  source: flow
  filter: "ndpi_risk_score > 100"
  columns: [stime, saddr, daddr, ndpi_appid, ndpi_risk_list]

Long-Duration Flows

query:
  source: flow
  filter: "dur > 3600000"  # > 1 hour in ms
  columns: [stime, etime, dur, saddr, daddr, sbytes, dbytes]
  order_by: "dur DESC"

External Traffic

query:
  source: flow
  filter: "NOT (saddr LIKE '10.%' OR saddr LIKE '192.168.%')"
  columns: [saddr, daddr, scountry, dcountry]

Application Distribution

aggregate:
  source: flow
  group_by: [ndpi_appid]
  aggregations:
    - function: count
      alias: flows
    - function: sum
      column: sbytes + dbytes
      alias: bytes
  filter: "ndpi_appid IS NOT NULL"
  order_by: "bytes DESC"
  limit: 20