Data Pipeline
Rockfish Detect processes data through a series of stages, each producing artifacts used by subsequent stages.
Pipeline Stages
sample --> extract --> rank --> train --> score
| Stage | Command | Input | Output |
|---|---|---|---|
| Sample | sample | Raw flow Parquet | Sampled Parquet |
| Extract | extract | Sampled Parquet | Normalization tables |
| Rank | rank | Normalization tables | Feature rankings |
| Train | train | Sampled + Normalization | Model files |
| Score | score | Raw flows + Model | Anomaly scores |
1. Sampling
Randomly samples flow data to reduce volume while maintaining statistical properties.
# Sample specific date
rockfish_detect -c config.yaml sample --date 2025-01-28
# Sample last N days
rockfish_detect -c config.yaml sample --days 7
# Clear state and resample all
rockfish_detect -c config.yaml sample --clear
Input Path
s3://<bucket>/<observation>/v2/year=YYYY/month=MM/day=DD/*.parquet
Output Path
s3://<bucket>/<observation>/sample/sample-YYYY-MM-DD.parquet
Configuration
sampling:
sample_percent: 10.0 # 10% of rows
retention_days: 7 # Keep 7 days of samples
State Tracking
Sampling maintains state to avoid reprocessing:
- Tracks which dates have been sampled
- Skips dates already in state file
- Use
--clearto reset state
2. Feature Extraction
Builds normalization lookup tables for ML training.
# Extract features for all protocols
rockfish_detect -c config.yaml extract
# Specific protocol
rockfish_detect -c config.yaml extract -p tcp
# Sequential (not parallel)
rockfish_detect -c config.yaml extract --sequential
Processing
For each field, creates a normalization table:
Numeric fields (dur, rtt, bytes, etc.):
- Histogram binning (quantile or equal-width)
- Maps raw values to bin indices
- Normalizes to [0, 1] range
Categorical fields (proto, ports, IPs):
- Frequency counting
- Maps values to frequency scores
- Special handling for IPs (/24 truncation)
Output Path
s3://<bucket>/<observation>/extract/<protocol>/<field>.parquet
Configuration
features:
num_bins: 10 # Histogram resolution
histogram_type: quantile # Better for skewed data
ip_hash_modulus: 65536 # IP dimensionality reduction
3. Feature Ranking
Ranks features by importance for model training.
# Rank using reconstruction error
rockfish_detect -c config.yaml rank
# Rank using SVD
rockfish_detect -c config.yaml rank -a svd
# Specific protocol
rockfish_detect -c config.yaml rank -p tcp
Algorithms
| Algorithm | Description |
|---|---|
reconstruction | Autoencoder reconstruction error (default) |
svd | Singular Value Decomposition importance |
Output
s3://<bucket>/<observation>/extract/<protocol>/rockfish_rank.parquet
Contains importance scores (0-1) for each field.
Using Rankings
training:
min_importance_score: 0.7 # Only use features above this
4. Model Training
Trains anomaly detection models on sampled data.
# Train HBOS model
rockfish_detect -c config.yaml train -a hbos
# Train hybrid model
rockfish_detect -c config.yaml train -a hybrid
# Train with ranked features only
rockfish_detect -c config.yaml train-ranked -n 10
# Specific protocol
rockfish_detect -c config.yaml train -p tcp
Algorithms
HBOS (Histogram-Based Outlier Score):
- Fast, interpretable
- Inverse density scoring
- Good baseline algorithm
Hybrid:
- Combines HBOS + correlation + threat intel
- Weighted scoring model
- Better for complex environments
Output
Models saved to configured directory:
<model_output_dir>/<protocol>_model.json
Configuration
training:
algorithm: hbos
model_output_dir: /var/lib/rockfish/models
hbos:
num_bins: 10
fields: [dur, rtt, pcr, spkts, dpkts, sbytes, dbytes]
5. Flow Scoring
Scores flows using trained models.
# Score specific date
rockfish_detect -c config.yaml score -d 2025-01-28
# Score since timestamp
rockfish_detect -c config.yaml score --since 2025-01-28T00:00:00Z
# With severity threshold
rockfish_detect -c config.yaml score -t 0.8
# Limit results
rockfish_detect -c config.yaml score -n 1000
# Output to file
rockfish_detect -c config.yaml score -o anomalies.parquet
Options
| Option | Description |
|---|---|
-d, --date | Score specific date |
--since | Score since timestamp |
-p | Specific protocol |
-t, --threshold | Minimum score threshold |
-n, --limit | Maximum results |
-o, --output | Output file path |
Severity Classification
# Percentile-based (default)
severity_mode: percentile
# Fixed thresholds
severity_mode: fixed
severity_thresholds:
low: 0.5
medium: 0.7
high: 0.85
critical: 0.95
Output
s3://<bucket>/<observation>/score/score-YYYY-MM-DD.parquet
Includes:
- Original flow fields
anomaly_score(0-1)severity(LOW, MEDIUM, HIGH, CRITICAL)
Automated Pipeline
Run the complete pipeline with a single command:
# Full pipeline for today
rockfish_detect -c config.yaml auto
# Specific date
rockfish_detect -c config.yaml auto --date 2025-01-28
# Last 7 days
rockfish_detect -c config.yaml auto --days 7
# Stop on first error
rockfish_detect -c config.yaml auto --fail-fast
Pipeline Order
- Sample data
- Extract features
- Rank features
- Train model
- Score flows
Reporting
Generate reports from scored data:
# Text report
rockfish_detect -c config.yaml report --date 2025-01-28
# JSON output
rockfish_detect -c config.yaml report -f json
# Filter by severity
rockfish_detect -c config.yaml report --min-severity HIGH
# Top N anomalies
rockfish_detect -c config.yaml report -n 50
Output Formats
| Format | Description |
|---|---|
text | Human-readable (default) |
json | Machine-readable JSON |
csv | CSV export |