gRPC Streaming

Why choose gRPC – lowest latency per‑message, Protobuf types, HTTP/2 transport.

Requires a Dedicated Node or Laserstream – both provide a gRPC endpoint and auth token.

How it works – open a bidirectional stream, send a SubscribeRequest, then receive a continuous feed of Pump AMM transactions. The sample handles reconnection up to ten times with exponential back‑off.

Requirements

1

Set Up Node.js Environment

Ensure you have Node.js ≥ 18 (tested with v20) installed on your system. You can check your version with:

node --version

If needed, install or update Node.js from nodejs.org.

2

Install TypeScript

If you plan to run the .ts samples with ts-node, install TypeScript ≥ 5:

npm install -g typescript ts-node

Verify the installation:

tsc --version
ts-node --version
3

Get Helius Access

You’ll need a Helius Dedicated Node or Laserstream subscription. Sign up through the Helius Dashboard if you don’t already have access.

4

Obtain gRPC Credentials

After subscribing to a Dedicated Node or Laserstream, you’ll receive:

  • A gRPC endpoint URL
  • An authentication token

Keep these secure as you’ll need them for your environment variables.

Implementation

1

Install Dependencies

npm install @triton-one/yellowstone-grpc bs58
2

Create the Stream Manager

Create a file named grpc-pump-stream.ts with the following code:

// grpc-pump-stream.ts
import Client, {
  CommitmentLevel, SubscribeRequest
} from "@triton-one/yellowstone-grpc";
import * as bs58 from 'bs58';

class GrpcStreamManager {
  private client: Client;
  private stream: any;
  private isConnected: boolean = false;
  private reconnectAttempts: number = 0;
  private readonly maxReconnectAttempts: number = 10;
  private readonly reconnectInterval: number = 5000; // 5 seconds
  private readonly dataHandler: (data: any) => void;

  constructor(
    endpoint: string,
    authToken: string,
    dataHandler: (data: any) => void
  ) {
    this.client = new Client(
      endpoint,
      authToken,
      { "grpc.max_receive_message_length": 64 * 1024 * 1024 }
    );
    this.dataHandler = dataHandler;
  }

  async connect(subscribeRequest: SubscribeRequest): Promise<void> {
    try {
      this.stream = await this.client.subscribe();
      this.isConnected = true;
      this.reconnectAttempts = 0;

      this.stream.on("data", this.handleData.bind(this));
      this.stream.on("error", this.handleError.bind(this));
      this.stream.on("end", () => this.handleDisconnect(subscribeRequest));
      this.stream.on("close", () => this.handleDisconnect(subscribeRequest));

      await this.write(subscribeRequest);
      this.startPing();
    } catch (error) {
      console.error("Connection error:", error);
      await this.reconnect(subscribeRequest);
    }
  }

  private async write(req: SubscribeRequest): Promise<void> {
    return new Promise((resolve, reject) => {
      this.stream.write(req, (err: any) => err ? reject(err) : resolve());
    });
  }

  private async reconnect(subscribeRequest: SubscribeRequest): Promise<void> {
    if (this.reconnectAttempts >= this.maxReconnectAttempts) {
      console.error("Max reconnection attempts reached");
      return;
    }

    this.reconnectAttempts++;
    console.log(`Reconnecting... Attempt ${this.reconnectAttempts}`);

    setTimeout(async () => {
      try {
        await this.connect(subscribeRequest);
      } catch (error) {
        console.error("Reconnection failed:", error);
        await this.reconnect(subscribeRequest);
      }
    }, this.reconnectInterval * Math.min(this.reconnectAttempts, 5));
  }

  private startPing(): void {
    setInterval(() => {
      if (this.isConnected) {
        this.write({
          ping: { id: 1 },
          accounts: {},
          accountsDataSlice: [],
          transactions: {},
          blocks: {},
          blocksMeta: {},
          entry: {},
          slots: {},
          transactionsStatus: {},
        }).catch(console.error);
      }
    }, 30000);
  }

  private handleData(data: any): void {
    try {
      const processed = this.processBuffers(data);
      this.dataHandler(processed);
    } catch (error) {
      console.error("Error processing data:", error);
    }
  }

  private handleError(error: any): void {
    console.error("Stream error:", error);
    this.isConnected = false;
  }

  private handleDisconnect(subscribeRequest: SubscribeRequest): void {
    console.log("Stream disconnected");
    this.isConnected = false;
    this.reconnect(subscribeRequest);
  }

  private processBuffers(obj: any): any {
    if (!obj) return obj;
    if (Buffer.isBuffer(obj) || obj instanceof Uint8Array) {
      return bs58.default.encode(obj); // Encode Buffers to base58
    }
    if (Array.isArray(obj)) {
      return obj.map(item => this.processBuffers(item));
    }
    if (typeof obj === 'object') {
      return Object.fromEntries(
        Object.entries(obj).map(([k, v]) => [k, this.processBuffers(v)])
      );
    }
    return obj;
  }
}
3

Implement Transaction Monitoring

Add the transaction monitoring code to your file:

// Transaction monitoring implementation
async function monitorTransactions() {
  const manager = new GrpcStreamManager(
    process.env.GRPC_ENDPOINT || "your-grpc-url",
    process.env.GRPC_AUTH_TOKEN || "your-grpc-token",
    handleTransactionUpdate
  );

  const subscribeRequest: SubscribeRequest = {
    transactions: {
      client: {
        accountInclude: ["pAMMBay6oceH9fJKBRHGP5D4bD4sWpmSwMn52FMfXEA"],
        accountExclude: [],
        accountRequired: [],
        vote: false,
        failed: false
      }
    },
    commitment: CommitmentLevel.PROCESSED,
    accounts: {},
    accountsDataSlice: [],
    blocks: {},
    blocksMeta: {},
    entry: {},
    slots: {},
    transactionsStatus: {}
  };

  await manager.connect(subscribeRequest);
}

function handleTransactionUpdate(data: any): void {
  if (data?.transaction?.transaction) {
    console.log(JSON.stringify(data.transaction, null, 2));
  }
}

monitorTransactions().catch(console.error);
4

Set Environment Variables

Create a .env file or set environment variables with your gRPC endpoint and auth token:

export GRPC_ENDPOINT="your-grpc-endpoint"
export GRPC_AUTH_TOKEN="your-auth-token"

Replace the placeholder values with your actual Helius Dedicated Node or Laserstream endpoint and authentication token.

5

Run the Application

Execute the script to start streaming Pump AMM data:

npx ts-node grpc-pump-stream.ts

You should see JSON-formatted Pump AMM transactions streaming in your terminal.

Key benefits

  • Lowest latency - gRPC’s binary protocol is optimized for high-throughput, low-latency data streaming
  • Structured data - Get fully parsed transaction objects with all accounts and instructions
  • Resilient connection - Built-in reconnection logic with exponential backoff
  • Production ready - Designed for high-volume applications that need reliable data streams

Common issues and solutions

Next steps

1

Implement Message Queue

Add a message queue like RabbitMQ or Kafka to process transactions asynchronously:

// Example with RabbitMQ
import amqp from 'amqplib';

async function setupQueue() {
  const connection = await amqp.connect('amqp://localhost');
  const channel = await connection.createChannel();
  await channel.assertQueue('pump-transactions');
  return channel;
}

// Then in your handleTransactionUpdate function:
async function handleTransactionUpdate(data, channel) {
  if (data?.transaction?.transaction) {
    channel.sendToQueue('pump-transactions', 
      Buffer.from(JSON.stringify(data.transaction)));
  }
}
2

Add Persistent Storage

Implement a database solution to store transaction data for later analysis:

import { MongoClient } from 'mongodb';

async function setupDatabase() {
  const client = new MongoClient('mongodb://localhost:27017');
  await client.connect();
  const db = client.db('pump-amm');
  return db.collection('transactions');
}

// Store transactions in the database
async function handleTransactionUpdate(data, collection) {
  if (data?.transaction?.transaction) {
    await collection.insertOne({
      timestamp: new Date(),
      transaction: data.transaction
    });
  }
}
3

Create Alerts

Set up a notification system for important events or threshold crossings:

import axios from 'axios';

// Example for Discord webhook notifications
async function sendAlert(message) {
  await axios.post('YOUR_WEBHOOK_URL', {
    content: message
  });
}

// Monitor for large transactions
async function handleTransactionUpdate(data) {
  if (data?.transaction?.transaction) {
    // Example: Alert on transactions with large amounts
    const tx = data.transaction;
    // Check for specific conditions in your transaction data
    if (someSignificantCondition) {
      await sendAlert(`Large transaction detected: ${tx.signature}`);
    }
  }
}