gRPC Streaming
Why choose gRPC – lowest latency per‑message, Protobuf types,
HTTP/2 transport.
Requires a Dedicated Node or LaserStream –
both provide a gRPC endpoint and auth token.
How it works – open a bidirectional stream, send a
SubscribeRequest
, then receive a continuous feed of Pump AMM
transactions. The sample handles reconnection up to ten times with exponential
back‑off.
Requirements
Set Up Node.js Environment
Ensure you have Node.js ≥ 18 (tested with v20) installed on your system. You can check your version with:
If needed, install or update Node.js from nodejs.org.
Install TypeScript
If you plan to run the .ts
samples with ts-node
, install TypeScript ≥ 5:
npm install -g typescript ts-node
Verify the installation:
tsc --version
ts-node --version
Get Helius Access
You’ll need a Helius Dedicated Node or LaserStream subscription. Sign up through the Helius Dashboard if you don’t already have access.
Obtain gRPC Credentials
After subscribing to a Dedicated Node or LaserStream, you’ll receive:
- A gRPC endpoint URL
- An authentication token
Keep these secure as you’ll need them for your environment variables.
Implementation
Install Dependencies
npm install @triton-one/yellowstone-grpc bs58
Create the Stream Manager
Create a file named grpc-pump-stream.ts
with the following code:
// grpc-pump-stream.ts
import Client, {
CommitmentLevel, SubscribeRequest
} from "@triton-one/yellowstone-grpc";
import * as bs58 from 'bs58';
class GrpcStreamManager {
private client: Client;
private stream: any;
private isConnected: boolean = false;
private reconnectAttempts: number = 0;
private readonly maxReconnectAttempts: number = 10;
private readonly reconnectInterval: number = 5000; // 5 seconds
private readonly dataHandler: (data: any) => void;
constructor(
endpoint: string,
authToken: string,
dataHandler: (data: any) => void
) {
this.client = new Client(
endpoint,
authToken,
{ "grpc.max_receive_message_length": 64 * 1024 * 1024 }
);
this.dataHandler = dataHandler;
}
async connect(subscribeRequest: SubscribeRequest): Promise<void> {
try {
this.stream = await this.client.subscribe();
this.isConnected = true;
this.reconnectAttempts = 0;
this.stream.on("data", this.handleData.bind(this));
this.stream.on("error", this.handleError.bind(this));
this.stream.on("end", () => this.handleDisconnect(subscribeRequest));
this.stream.on("close", () => this.handleDisconnect(subscribeRequest));
await this.write(subscribeRequest);
this.startPing();
} catch (error) {
console.error("Connection error:", error);
await this.reconnect(subscribeRequest);
}
}
private async write(req: SubscribeRequest): Promise<void> {
return new Promise((resolve, reject) => {
this.stream.write(req, (err: any) => err ? reject(err) : resolve());
});
}
private async reconnect(subscribeRequest: SubscribeRequest): Promise<void> {
if (this.reconnectAttempts >= this.maxReconnectAttempts) {
console.error("Max reconnection attempts reached");
return;
}
this.reconnectAttempts++;
console.log(`Reconnecting... Attempt ${this.reconnectAttempts}`);
setTimeout(async () => {
try {
await this.connect(subscribeRequest);
} catch (error) {
console.error("Reconnection failed:", error);
await this.reconnect(subscribeRequest);
}
}, this.reconnectInterval * Math.min(this.reconnectAttempts, 5));
}
private startPing(): void {
setInterval(() => {
if (this.isConnected) {
this.write({
ping: { id: 1 },
accounts: {},
accountsDataSlice: [],
transactions: {},
blocks: {},
blocksMeta: {},
entry: {},
slots: {},
transactionsStatus: {},
}).catch(console.error);
}
}, 30000);
}
private handleData(data: any): void {
try {
const processed = this.processBuffers(data);
this.dataHandler(processed);
} catch (error) {
console.error("Error processing data:", error);
}
}
private handleError(error: any): void {
console.error("Stream error:", error);
this.isConnected = false;
}
private handleDisconnect(subscribeRequest: SubscribeRequest): void {
console.log("Stream disconnected");
this.isConnected = false;
this.reconnect(subscribeRequest);
}
private processBuffers(obj: any): any {
if (!obj) return obj;
if (Buffer.isBuffer(obj) || obj instanceof Uint8Array) {
return bs58.default.encode(obj); // Encode Buffers to base58
}
if (Array.isArray(obj)) {
return obj.map(item => this.processBuffers(item));
}
if (typeof obj === 'object') {
return Object.fromEntries(
Object.entries(obj).map(([k, v]) => [k, this.processBuffers(v)])
);
}
return obj;
}
}
Implement Transaction Monitoring
Add the transaction monitoring code to your file:
// Transaction monitoring implementation
async function monitorTransactions() {
const manager = new GrpcStreamManager(
process.env.GRPC_ENDPOINT || "your-grpc-url",
process.env.GRPC_AUTH_TOKEN || "your-grpc-token",
handleTransactionUpdate
);
const subscribeRequest: SubscribeRequest = {
transactions: {
client: {
accountInclude: ["pAMMBay6oceH9fJKBRHGP5D4bD4sWpmSwMn52FMfXEA"],
accountExclude: [],
accountRequired: [],
vote: false,
failed: false
}
},
commitment: CommitmentLevel.PROCESSED,
accounts: {},
accountsDataSlice: [],
blocks: {},
blocksMeta: {},
entry: {},
slots: {},
transactionsStatus: {}
};
await manager.connect(subscribeRequest);
}
function handleTransactionUpdate(data: any): void {
if (data?.transaction?.transaction) {
console.log(JSON.stringify(data.transaction, null, 2));
}
}
monitorTransactions().catch(console.error);
Set Environment Variables
Create a .env
file or set environment variables with your gRPC endpoint and auth token:
export GRPC_ENDPOINT="your-grpc-endpoint"
export GRPC_AUTH_TOKEN="your-auth-token"
Replace the placeholder values with your actual Helius Dedicated Node or LaserStream endpoint and authentication token.
Run the Application
Execute the script to start streaming Pump AMM data:
npx ts-node grpc-pump-stream.ts
You should see JSON-formatted Pump AMM transactions streaming in your terminal.
Key benefits
- Lowest latency - gRPC’s binary protocol is optimized for high-throughput, low-latency data streaming
- Structured data - Get fully parsed transaction objects with all accounts and instructions
- Resilient connection - Built-in reconnection logic with exponential backoff
- Production ready - Designed for high-volume applications that need reliable data streams
Common issues and solutions
Next steps
Implement Message Queue
Add a message queue like RabbitMQ or Kafka to process transactions asynchronously:
// Example with RabbitMQ
import amqp from 'amqplib';
async function setupQueue() {
const connection = await amqp.connect('amqp://localhost');
const channel = await connection.createChannel();
await channel.assertQueue('pump-transactions');
return channel;
}
// Then in your handleTransactionUpdate function:
async function handleTransactionUpdate(data, channel) {
if (data?.transaction?.transaction) {
channel.sendToQueue('pump-transactions',
Buffer.from(JSON.stringify(data.transaction)));
}
}
Add Persistent Storage
Implement a database solution to store transaction data for later analysis:
import { MongoClient } from 'mongodb';
async function setupDatabase() {
const client = new MongoClient('mongodb://localhost:27017');
await client.connect();
const db = client.db('pump-amm');
return db.collection('transactions');
}
// Store transactions in the database
async function handleTransactionUpdate(data, collection) {
if (data?.transaction?.transaction) {
await collection.insertOne({
timestamp: new Date(),
transaction: data.transaction
});
}
}
Create Alerts
Set up a notification system for important events or threshold crossings:
import axios from 'axios';
// Example for Discord webhook notifications
async function sendAlert(message) {
await axios.post('YOUR_WEBHOOK_URL', {
content: message
});
}
// Monitor for large transactions
async function handleTransactionUpdate(data) {
if (data?.transaction?.transaction) {
// Example: Alert on transactions with large amounts
const tx = data.transaction;
// Check for specific conditions in your transaction data
if (someSignificantCondition) {
await sendAlert(`Large transaction detected: ${tx.signature}`);
}
}
}