
A Substreams package that retrieves events and instructions from both Pump.fun Bonding Curve and Pump AMM programs in real-time.
flowchart LR
subgraph Solana["Solana Mainnet"]
PUMP["fa:fa-rocket pump.fun<br/>Bonding Curve"]
AMM["fa:fa-exchange-alt Pump AMM"]
end
subgraph Substreams["Substreams"]
MAP["fa:fa-filter map_db_out"]
end
subgraph Sink["substreams-sink-sql"]
PG["fa:fa-database PostgreSQL"]
PGWEB["fa:fa-globe pgweb"]
end
PUMP --> MAP
AMM --> MAP
MAP --> PG
PG --> PGWEB
| Program | Program ID | Description |
|---|---|---|
| pump.fun Bonding Curve | 6EF8rrecthR5Dkzon8Nwu78hRvfCKubJ14M5uBEwF6P |
Token creation and initial trading (bonding curve) |
| Pump AMM | pAMMBay6oceH9fJKBRHGP5D4bD4sWpmSwMn52FMfXEA |
AMM trading after graduation |
This project uses two code generation mechanisms:
proto/ → src/pb/)User-defined protobuf messages in proto/ directory are compiled to Rust code using substreams protogen.
proto/
├── program.proto # Domain-specific messages (TradeEvent, TokenCreated, etc.)
└── database.proto # DatabaseChanges for SQL sink
↓ substreams protogen
src/pb/
├── mod.rs
├── substreams.v1.program.rs # Generated from program.proto
├── sf.substreams.sink.database.v1.rs # Generated from database.proto
└── ... (other generated files)
Regenerate proto files:
substreams protogen substreams.yaml --exclude-paths=sf/substreams/rpc,sf/substreams/v1,google
idls/ → src/idl/)Program IDL (Interface Definition Language) files from Anchor-based Solana programs are used to generate type-safe Rust bindings via the declare_program!() macro.
idls/
├── pump.json # pump.fun Bonding Curve program IDL
└── pump_amm.json # Pump AMM program IDL
↓ declare_program!() macro (compile-time)
src/idl/
└── mod.rs # Declares pump and pump_amm modules
How it works:
The declare_program!() macro from anchor-lang automatically reads the IDL JSON files at compile time and generates:
events module: Type-safe event structs (e.g., CreateEvent, TradeEvent)types module: Custom types defined in the programaccounts module: Account data typesprogram module: Program ID constant// src/idl/mod.rs
pub mod pump {
use anchor_lang::declare_program;
declare_program!(pump); // Reads idls/pump.json
pub use self::pump::*;
}
pub mod pump_amm {
use anchor_lang::declare_program;
declare_program!(pump_amm); // Reads idls/pump_amm.json
pub use self::pump_amm::*;
}
Usage in code:
use crate::idl::pump::events::CreateEvent;
use crate::idl::pump_amm::events::BuyEvent;
// Decode events from instruction data
let create_event = CreateEvent::try_from_slice(&event_data)?;
Reference: Anchor declare_program documentation
This project follows the Substreams best practices:
Block → map (facts extraction)
→ store (minimal state)
→ map (normalize → DatabaseChanges)
→ SQL sink
→ DB aggregates
→ Rule engine (outside Substreams)
Key principles:
map modules: Stateless, pure functions for event extractionstore modules: Minimal state only (e.g., holder_count, known_mints)idls/pump.json)| Event | Description |
|---|---|
CreateEvent |
New token creation |
TradeEvent |
Buy/sell transactions |
CompleteEvent |
Bonding curve completion (Graduate) |
SetParamsEvent |
Parameter settings |
idls/pump_amm.json)| Event | Description |
|---|---|
BuyEvent |
AMM purchase |
SellEvent |
AMM sale |
CreatePoolEvent |
Pool creation |
DepositEvent |
LP addition |
WithdrawEvent |
LP removal |
CollectCoinCreatorFeeEvent |
Creator fee collection |
proto/program.proto)| Message | Description |
|---|---|
TokenCreated |
Normalized token creation event |
TradeEvent |
Unified trade event (Bonding + AMM) |
TokenAccountBalanceChange |
Balance changes for holder_count |
WalletLabel |
Detection facts (sniper, bundler, insider) |
flowchart TD
subgraph Input["Solana Blocks"]
BLOCKS["solana:blocks_without_votes"]
end
subgraph Maps["Map Modules"]
CREATE["map_pump_create"]
BONDING["map_pump_bonding_trades"]
AMM["map_pump_amm_trades"]
TAC["map_token_account_changes"]
end
subgraph Stores["Store Modules"]
MINTS["store_known_mints"]
TRADE["store_mint_has_trade"]
CREATOR["store_token_creator"]
HOLDER["store_holder_count"]
end
subgraph Detect["Detection Modules"]
SNIPER["map_detect_sniper"]
BUNDLER["map_detect_bundler"]
INSIDER["map_detect_insider"]
end
subgraph Sink["Sink Module"]
DB["map_db_out"]
end
BLOCKS --> CREATE
BLOCKS --> BONDING
BLOCKS --> AMM
BLOCKS --> TAC
CREATE --> MINTS
CREATE --> CREATOR
BONDING --> TRADE
AMM --> TRADE
MINTS --> TRADE
MINTS --> TAC
TRADE --> TAC
TAC --> HOLDER
BONDING --> SNIPER
MINTS --> SNIPER
BONDING --> BUNDLER
TAC --> INSIDER
CREATOR --> INSIDER
CREATE --> DB
BONDING --> DB
AMM --> DB
TAC --> DB
SNIPER --> DB
BUNDLER --> DB
INSIDER --> DB
HOLDER --> DB
The complete data flow from Solana blocks to PostgreSQL:
flowchart LR
subgraph A["Solana"]
B["Blocks (without votes)"]
end
subgraph S["Substreams"]
M1["map_pump_create"]
M2["map_pump_bonding_trades"]
M3["map_pump_amm_trades"]
M4["map_token_account_changes"]
ST1["store_known_mints"]
ST2["store_mint_has_trade"]
ST3["store_token_creator"]
ST4["store_holder_count"]
D1["map_detect_sniper"]
D2["map_detect_insider"]
D3["map_detect_bundler"]
OUT["map_db_out (DatabaseChanges)"]
end
subgraph DB["PostgreSQL (sink)"]
T["tables"]
end
B --> M1
B --> M2
B --> M3
B --> M4
M1 --> ST1
M1 --> ST3
M2 --> ST2
M3 --> ST2
ST1 --> ST2
M4 --> ST4
M2 --> D1
ST1 --> D1
M4 --> D2
ST3 --> D2
M2 --> D3
M1 --> OUT
M2 --> OUT
M3 --> OUT
M4 --> OUT
D1 --> OUT
D2 --> OUT
D3 --> OUT
ST4 --> OUT
OUT --> T
# Install Substreams CLI
brew install streamingfast/tap/substreams
# Install substreams-sink-sql
brew install streamingfast/tap/substreams-sink-sql
# Install buf (development only)
brew install bufbuild/buf/buf
# Substreams registry authentication (interactive)
substreams registry login
# Optional (non-interactive): provide the token via env or a local .substreams.env file
# echo 'SUBSTREAMS_API_TOKEN=...' > .substreams.env
# Build WASM
cargo build --target wasm32-unknown-unknown --release
# Create package
substreams pack substreams.yaml
source .substreams.env
# Test the main sink module (DatabaseChanges output)
substreams gui substreams.yaml map_db_out
# Or test individual modules
substreams gui substreams.yaml map_pump_create # Token creations
substreams gui substreams.yaml map_pump_bonding_trades # Bonding curve trades
substreams gui substreams.yaml map_pump_amm_trades # AMM trades
substreams gui substreams.yaml map_detect_sniper # Sniper detection
This repo ships a DatabaseChanges sink module (map_db_out) and writes to Postgres via substreams-sink-sql.
Services:
localhost:5432http://localhost:8081# 1) Start Postgres + pgweb
docker compose up -d
# 2) Apply schema
# WARNING: schema.sql DROPS the `public` schema (reset script). Safe for local/dev only.
docker exec -i daiko-pumpfun-substreams-postgres psql -U dev -d main < schema.sql
# 3) Provide Substreams API token (or ensure you are logged in via `substreams registry login`)
source .substreams.env
# 4) Create sink internal tables (cursor/history)
substreams-sink-sql setup \
"postgres://dev:insecure@localhost:5432/main?sslmode=disable" \
./daiko_pump_fun_substreams-v0.2.0.spkg \
--cursors-table _cursors \
--history-table _substreams_history
If you just want to confirm "events are flowing and rows are written", run a short range and flush more frequently:
Note: substreams-sink-sql run does not accept a module name as a positional argument. The optional 3rd argument is a block range in the form <start>:<stop>. The sink output module (this repo uses map_db_out) is inferred from the package.
source .substreams.env
substreams-sink-sql run \
"postgres://dev:insecure@localhost:5432/main?sslmode=disable" \
./daiko_pump_fun_substreams-v0.2.0.spkg \
387793194:387794000 \
--cursors-table _cursors \
--history-table _substreams_history \
--endpoint mainnet.sol.streamingfast.io:443 \
--batch-block-flush-interval 50 \
--batch-row-flush-interval 5000
If you want to validate against the latest chain head without manually picking slots, use the Makefile helper:
# Start 50 blocks behind head and stop 200 blocks ahead of head (bounded run)
make sink-smoke
This resolves the current Solana head slot via solana slot and runs an absolute range.
Note: this uses the same cursor tables as other targets. If you run it on the same database,
it can advance your cursor.
If you want to start from the current chain head (or slightly behind head) and keep streaming:
# Start 50 blocks behind head and keep streaming (no stop block)
make sink-head HEAD_FROM_HEAD=-50
Notes:
store_known_mints / store_holder_count will only reflect what they see from the chosen start block onward.
If you need fully accurate historical state, use make sink (backfill) instead.substreams-sink-sql does not support an "infinite stop" in the <start>:<stop> range syntax.make sink-head uses a very large stop (HEAD_TO_HEAD, default: 1_000_000_000 blocks ahead of head) to behave like a long-running stream without falling back to block 0 on a fresh cursor table.If you want to watch data in pgweb while validating pump.fun in the browser, run without a stop block.
source .substreams.env
substreams-sink-sql run \
"postgres://dev:insecure@localhost:5432/main?sslmode=disable" \
./daiko_pump_fun_substreams-v0.2.0.spkg \
--cursors-table _cursors \
--history-table _substreams_history \
--endpoint mainnet.sol.streamingfast.io:443 \
--infinite-retry
Notes:
live: true in stream stats.initialBlock (or resume from _cursors if present) before it becomes live.--batch-block-flush-interval (default 1000) and --batch-row-flush-interval (default 100000).--live-block-flush-interval (default 1).Real-time ingestion is a trade-off: lower latency requires more frequent transactions (more DB load).
Recommended starting point for Postgres (single node) if latency matters:
--live-block-flush-interval 5 (flush every 5 blocks; typically a few seconds on Solana)--batch-block-flush-interval 200 and --batch-row-flush-interval 20000Example:
source .substreams.env
substreams-sink-sql run \
"postgres://dev:insecure@localhost:5432/main?sslmode=disable" \
./daiko_pump_fun_substreams-v0.2.0.spkg \
--cursors-table _cursors \
--history-table _substreams_history \
--endpoint mainnet.sol.streamingfast.io:443 \
--batch-block-flush-interval 200 \
--batch-row-flush-interval 20000 \
--live-block-flush-interval 5 \
--infinite-retry
Tuning tips:
--live-block-flush-interval (e.g. 10, 20) first.--undo-buffer-size 0 (default) and use a modest live flush interval.On a brand-new database, run once:
# WARNING: schema.sql DROPS the `public` schema (reset script).
psql "<POSTGRES_DSN>" -f schema.sql
substreams-sink-sql setup \
"<POSTGRES_DSN>" \
./daiko_pump_fun_substreams-v0.2.0.spkg \
--cursors-table _cursors \
--history-table _substreams_history
Then start the long-running sink (backfills from the manifest initialBlock and then continues streaming in live mode):
export SUBSTREAMS_API_TOKEN="<YOUR_TOKEN>" # set via secret manager
export DLOG="info"
substreams-sink-sql run \
"<POSTGRES_DSN>" \
./daiko_pump_fun_substreams-v0.2.0.spkg \
--cursors-table _cursors \
--history-table _substreams_history \
--endpoint mainnet.sol.streamingfast.io:443 \
--infinite-retry
Run it under a process supervisor (systemd, Docker, Kubernetes). If you just need a quick background run:
nohup substreams-sink-sql run \
"<POSTGRES_DSN>" \
./daiko_pump_fun_substreams-v0.2.0.spkg \
--cursors-table _cursors \
--history-table _substreams_history \
--endpoint mainnet.sol.streamingfast.io:443 \
--infinite-retry \
> sink.log 2>&1 &
Notes:
--batch-* intervals for faster visibility.--final-blocks-only.Open http://localhost:8081 in your browser
# List tables
docker exec daiko-pumpfun-substreams-postgres psql -U dev -d main -c "\dt"
# Trade count
docker exec daiko-pumpfun-substreams-postgres psql -U dev -d main -c "SELECT COUNT(*) FROM trades"
# Latest trades (unified bonding curve + AMM)
docker exec daiko-pumpfun-substreams-postgres psql -U dev -d main -c \
"SELECT mint_address, sol_amount, token_amount, side, trade_source, wallet_address, block_timestamp
FROM trades
ORDER BY block DESC
LIMIT 10"
# New token creations
docker exec daiko-pumpfun-substreams-postgres psql -U dev -d main -c \
"SELECT symbol, mint_address, creator_address, bonding_curve_address, created_timestamp
FROM tokens
ORDER BY created_block DESC
LIMIT 10"
# Wallet labels (sniper, bundler, insider detection)
docker exec daiko-pumpfun-substreams-postgres psql -U dev -d main -c \
"SELECT mint_address, wallet_address, label_kind, detected_timestamp
FROM wallet_labels
ORDER BY detected_block DESC
LIMIT 10"
# Cursor position (progress)
docker exec daiko-pumpfun-substreams-postgres psql -U dev -d main -c \
"SELECT block_num, block_id FROM _cursors"
⚠️ This project does NOT own the schema definition.
The PostgreSQL schema for substreams.* tables is owned and managed by agentic-terminal.
agentic-terminal/packages/db/src/schema/substreams-schema.tssubstreams.* tables via DatabaseChanges# Fetch schema from agentic-terminal
make fetch-schema
# This will:
# 1. cd ../agentic-terminal
# 2. bun run export-substreams-schema
# 3. Generate schema.sql in this directory
The generated schema.sql is gitignored because it's a build artifact.
agentic-terminalmake fetch-schema in this project to get the latest schemaDatabaseChanges) if neededagentic-terminal = Schema definition + Query + Type generationdaiko_pump_fun_substreams = Data writing onlyagentic-terminal - Schema owner, query layer, TypeScript typesmeme-scrapers - TikTok/KnowYourMeme scraper (uses meme.* schema)For the authoritative schema definition, see agentic-terminal/packages/db/src/schema/substreams-schema.ts.
tokens: PK (mint_address)token_metrics: PK (mint_address)
price_sol, mc_sol, ath_price_sol, ath_mc_sol: numeric(38,18)trg_token_metrics_ath_updatetrades: PK (signature, ix_index, inner_ix_index, event_index)
trade_source: bonding_curve or ammprice_sol: numeric(38,18)pool_address is populated for AMM tradeswallets: PK (address)Note:
token_account_changestable has been deprecated. Balance changes are processed internally forholder_countcomputation but are not persisted to the database.
wallet_labels: PK (mint_address, wallet_address, label_kind)These tables are used by substreams-sink-sql for cursor tracking and are now schema-qualified:
substreams._cursors - Stores the latest processed cursor for resumesubstreams._substreams_history - Stores processing history for reorg/undo supportNote: When running the sink, use schema-qualified table names:
substreams-sink-sql run ... \
--cursors-table "substreams._cursors" \
--history-table "substreams._substreams_history"
This project automatically publishes to the Substreams Registry on every push to main that modifies source code.
The GitHub Actions workflow (.github/workflows/publish-spkg.yml) will:
substreams.yamlmainRequired GitHub Secrets:
SUBSTREAMS_API_TOKEN: Your Substreams registry token (get from https://substreams.dev/me)If you need to publish manually:
source .substreams.env
# Build and pack first
cargo build --target wasm32-unknown-unknown --release
substreams pack substreams.yaml
# Publish (requires registry auth)
# - For local use: run `substreams registry login` once, or set SUBSTREAMS_API_TOKEN in .substreams.env
substreams publish daiko_pump_fun_substreams-v0.2.0.spkg --yes
Consumers (like agentic-terminal) can reference the published package directly:
# In substreams-sink-sql
substreams-sink-sql run \
"$DATABASE_URL" \
"spkg.io/daikolabs/daiko_pump_fun_substreams-v0.2.0" \
--endpoint mainnet.sol.streamingfast.io:443
# In Docker (agentic-terminal)
SPKG=spkg.io/daikolabs/daiko_pump_fun_substreams-v0.2.0 docker compose up -d
No local .spkg file or this repository is needed for consumption.
.
├── Cargo.toml # Rust dependencies & build configuration
├── substreams.yaml # Substreams module definitions
├── docker-compose.yaml # PostgreSQL + pgweb for local development
├── Makefile # Common development commands
│
├── idls/ # Anchor IDL files (INPUT - do not edit)
│ ├── pump.json # pump.fun Bonding Curve program IDL
│ └── pump_amm.json # Pump AMM program IDL
│
├── proto/ # User-defined Protobuf messages (INPUT)
│ ├── program.proto # Domain events (TradeEvent, TokenCreated, etc.)
│ └── database.proto # DatabaseChanges for SQL sink
│
└── src/
├── lib.rs # Substreams module entry points
│
├── idl/ # IDL module declarations
│ └── mod.rs # declare_program!() → generates types at compile time
│
├── pb/ # Generated Protobuf code (OUTPUT - do not edit)
│ ├── mod.rs # Module exports
│ ├── substreams.v1.program.rs # Generated from proto/program.proto
│ └── sf.substreams.sink.database.v1.rs # Generated from proto/database.proto
│
├── programs/ # Program-specific logic
│ ├── pumpfun/ # pump.fun Bonding Curve handlers
│ │ ├── events.rs # Event parsing using idl::pump::events
│ │ ├── map_create.rs # Token creation extraction
│ │ └── map_trades.rs # Trade event extraction
│ └── pump_amm/ # Pump AMM handlers
│ ├── events.rs # Event parsing using idl::pump_amm::events
│ └── map_trades.rs # AMM trade extraction
│
├── modules/ # Substreams module implementations
│ ├── map/ # Map modules (stateless fact extraction)
│ ├── stores/ # Store modules (minimal state)
│ ├── detect/ # Detection modules (sniper, bundler, insider)
│ └── sinks/ # Sink modules (DatabaseChanges output)
│
└── utils/ # Shared utilities
├── meta.rs # ChainMeta construction
├── token_balance.rs # Token balance change detection
└── tx.rs # Transaction parsing helpers
When you modify files in proto/, regenerate Rust code:
substreams protogen substreams.yaml --exclude-paths=sf/substreams/rpc,sf/substreams/v1,google
Note: The generated files in
src/pb/should not be edited manually.
IDL code is generated automatically at compile time by the declare_program!() macro. No manual step is required.
If you update the IDL files in idls/, simply rebuild:
cargo build --target wasm32-unknown-unknown --release
# Format code
cargo fmt
# Check formatting (no changes)
cargo fmt -- --check
# Or use Makefile
make fmt # Format
make fmt-check # Format check
# Run lint
cargo clippy
# Run lint (treat warnings as errors)
cargo clippy -- -D warnings
# Fix auto-fixable issues
cargo clippy --fix --allow-dirty --allow-staged
# Or use Makefile
make lint # Run lint
make lint-fix # Auto-fix
make check # Format check + lint
# Start
docker compose up -d
# Stop
docker compose down
# Restart (keep data)
docker compose down
docker compose up -d
# Hard reset (wipe Postgres data) + restart
# NOTE: Postgres data is bind-mounted to ./data/postgres in docker-compose.yaml
docker compose down --volumes
rm -rf ./data/postgres
docker compose up -d
# Complete removal (including volumes)
docker compose down -v
MIT
_cursors / _substreams_history tables are created automatically. What are they?Those are internal tables created by substreams-sink-sql (the SQL sink), not tables defined by this project.
They exist to support:
_cursors stores the latest processed cursor so the sink can restart from the correct position._substreams_history stores processing history, used for reorg/undo support.In general, no. Deleting them loses progress tracking and can cause unexpected reprocessing or duplication.
If you want a clean re-sync from scratch, prefer resetting the database explicitly (e.g. docker compose down -v to drop the volume and re-initialize).
substreams gui daiko-pump-fun-substreams@v0.2.6 map_pump_createsubstreams gui daiko-pump-fun-substreams@v0.2.6 map_pump_graduationssubstreams gui daiko-pump-fun-substreams@v0.2.6 map_pump_bonding_tradessubstreams gui daiko-pump-fun-substreams@v0.2.6 map_pump_amm_tradessubstreams gui daiko-pump-fun-substreams@v0.2.6 map_token_account_changessubstreams gui daiko-pump-fun-substreams@v0.2.6 map_token_transferssubstreams gui daiko-pump-fun-substreams@v0.2.6 map_pump_amm_poolssubstreams gui daiko-pump-fun-substreams@v0.2.6 map_detect_snipersubstreams gui daiko-pump-fun-substreams@v0.2.6 map_detect_bundlersubstreams gui daiko-pump-fun-substreams@v0.2.6 map_detect_insidersubstreams gui daiko-pump-fun-substreams@v0.2.6 map_db_outsubstreams gui daiko-pump-fun-substreams@v0.2.6 solana:blocks_without_votessubstreams gui daiko-pump-fun-substreams@v0.2.6 store_known_mintssubstreams gui daiko-pump-fun-substreams@v0.2.6 store_token_creatorsubstreams gui daiko-pump-fun-substreams@v0.2.6 store_holder_countsubstreams gui daiko-pump-fun-substreams@v0.2.6 store_mint_has_tradesubstreams gui daiko-pump-fun-substreams@v0.2.6 store_pump_amm_pool_mints