Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Data Pipeline

Rockfish Detect processes data through a series of stages, each producing artifacts used by subsequent stages.

Pipeline Stages

sample --> extract --> rank --> train --> score
StageCommandInputOutput
SamplesampleRaw flow ParquetSampled Parquet
ExtractextractSampled ParquetNormalization tables
RankrankNormalization tablesFeature rankings
TraintrainSampled + NormalizationModel files
ScorescoreRaw flows + ModelAnomaly scores

1. Sampling

Randomly samples flow data to reduce volume while maintaining statistical properties.

# Sample specific date
rockfish_detect -c config.yaml sample --date 2025-01-28

# Sample last N days
rockfish_detect -c config.yaml sample --days 7

# Clear state and resample all
rockfish_detect -c config.yaml sample --clear

Input Path

s3://<bucket>/<observation>/v2/year=YYYY/month=MM/day=DD/*.parquet

Output Path

s3://<bucket>/<observation>/sample/sample-YYYY-MM-DD.parquet

Configuration

sampling:
  sample_percent: 10.0    # 10% of rows
  retention_days: 7       # Keep 7 days of samples

State Tracking

Sampling maintains state to avoid reprocessing:

  • Tracks which dates have been sampled
  • Skips dates already in state file
  • Use --clear to reset state

2. Feature Extraction

Builds normalization lookup tables for ML training.

# Extract features for all protocols
rockfish_detect -c config.yaml extract

# Specific protocol
rockfish_detect -c config.yaml extract -p tcp

# Sequential (not parallel)
rockfish_detect -c config.yaml extract --sequential

Processing

For each field, creates a normalization table:

Numeric fields (dur, rtt, bytes, etc.):

  • Histogram binning (quantile or equal-width)
  • Maps raw values to bin indices
  • Normalizes to [0, 1] range

Categorical fields (proto, ports, IPs):

  • Frequency counting
  • Maps values to frequency scores
  • Special handling for IPs (/24 truncation)

Output Path

s3://<bucket>/<observation>/extract/<protocol>/<field>.parquet

Configuration

features:
  num_bins: 10              # Histogram resolution
  histogram_type: quantile  # Better for skewed data
  ip_hash_modulus: 65536    # IP dimensionality reduction

3. Feature Ranking

Ranks features by importance for model training.

# Rank using reconstruction error
rockfish_detect -c config.yaml rank

# Rank using SVD
rockfish_detect -c config.yaml rank -a svd

# Specific protocol
rockfish_detect -c config.yaml rank -p tcp

Algorithms

AlgorithmDescription
reconstructionAutoencoder reconstruction error (default)
svdSingular Value Decomposition importance

Output

s3://<bucket>/<observation>/extract/<protocol>/rockfish_rank.parquet

Contains importance scores (0-1) for each field.

Using Rankings

training:
  min_importance_score: 0.7   # Only use features above this

4. Model Training

Trains anomaly detection models on sampled data.

# Train HBOS model
rockfish_detect -c config.yaml train -a hbos

# Train hybrid model
rockfish_detect -c config.yaml train -a hybrid

# Train with ranked features only
rockfish_detect -c config.yaml train-ranked -n 10

# Specific protocol
rockfish_detect -c config.yaml train -p tcp

Algorithms

HBOS (Histogram-Based Outlier Score):

  • Fast, interpretable
  • Inverse density scoring
  • Good baseline algorithm

Hybrid:

  • Combines HBOS + correlation + threat intel
  • Weighted scoring model
  • Better for complex environments

Output

Models saved to configured directory:

<model_output_dir>/<protocol>_model.json

Configuration

training:
  algorithm: hbos
  model_output_dir: /var/lib/rockfish/models

  hbos:
    num_bins: 10
    fields: [dur, rtt, pcr, spkts, dpkts, sbytes, dbytes]

5. Flow Scoring

Scores flows using trained models.

# Score specific date
rockfish_detect -c config.yaml score -d 2025-01-28

# Score since timestamp
rockfish_detect -c config.yaml score --since 2025-01-28T00:00:00Z

# With severity threshold
rockfish_detect -c config.yaml score -t 0.8

# Limit results
rockfish_detect -c config.yaml score -n 1000

# Output to file
rockfish_detect -c config.yaml score -o anomalies.parquet

Options

OptionDescription
-d, --dateScore specific date
--sinceScore since timestamp
-pSpecific protocol
-t, --thresholdMinimum score threshold
-n, --limitMaximum results
-o, --outputOutput file path

Severity Classification

# Percentile-based (default)
severity_mode: percentile

# Fixed thresholds
severity_mode: fixed
severity_thresholds:
  low: 0.5
  medium: 0.7
  high: 0.85
  critical: 0.95

Output

s3://<bucket>/<observation>/score/score-YYYY-MM-DD.parquet

Includes:

  • Original flow fields
  • anomaly_score (0-1)
  • severity (LOW, MEDIUM, HIGH, CRITICAL)

Automated Pipeline

Run the complete pipeline with a single command:

# Full pipeline for today
rockfish_detect -c config.yaml auto

# Specific date
rockfish_detect -c config.yaml auto --date 2025-01-28

# Last 7 days
rockfish_detect -c config.yaml auto --days 7

# Stop on first error
rockfish_detect -c config.yaml auto --fail-fast

Pipeline Order

  1. Sample data
  2. Extract features
  3. Rank features
  4. Train model
  5. Score flows

Reporting

Generate reports from scored data:

# Text report
rockfish_detect -c config.yaml report --date 2025-01-28

# JSON output
rockfish_detect -c config.yaml report -f json

# Filter by severity
rockfish_detect -c config.yaml report --min-severity HIGH

# Top N anomalies
rockfish_detect -c config.yaml report -n 50

Output Formats

FormatDescription
textHuman-readable (default)
jsonMachine-readable JSON
csvCSV export