Entry Monitoring with Yellowstone gRPC

This guide demonstrates how to use Yellowstone gRPC to monitor block entries on Solana. Block entries represent the fundamental units of execution on the Solana blockchain, containing transaction batches and their execution results. You’ll learn how to track entries with reliable reconnection handling.

TypeScript Implementation

import Client, { CommitmentLevel, SubscribeRequest } from "@triton-one/yellowstone-grpc";
import * as bs58 from 'bs58';

class GrpcStreamManager {
    private client: Client;
    private stream: any;
    private isConnected: boolean = false;
    private reconnectAttempts: number = 0;
    private readonly maxReconnectAttempts: number = 10;
    private readonly reconnectInterval: number = 5000; // 5 seconds
    private readonly dataHandler: (data: any) => void;

    constructor(
        endpoint: string,
        authToken: string,
        dataHandler: (data: any) => void
    ) {
        this.client = new Client(
            endpoint,
            authToken,
            { "grpc.max_receive_message_length": 64 * 1024 * 1024 }
        );
        this.dataHandler = dataHandler;
    }

    async connect(subscribeRequest: SubscribeRequest): Promise<void> {
        try {
            this.stream = await this.client.subscribe();
            this.isConnected = true;
            this.reconnectAttempts = 0;

            this.stream.on("data", this.handleData.bind(this));
            this.stream.on("error", this.handleError.bind(this));
            this.stream.on("end", () => this.handleDisconnect(subscribeRequest));
            this.stream.on("close", () => this.handleDisconnect(subscribeRequest));

            await this.write(subscribeRequest);
            this.startPing();
        } catch (error) {
            console.error("Connection error:", error);
            await this.reconnect(subscribeRequest);
        }
    }

    private async write(req: SubscribeRequest): Promise<void> {
        return new Promise((resolve, reject) => {
            this.stream.write(req, (err: any) => err ? reject(err) : resolve());
        });
    }

    private async reconnect(subscribeRequest: SubscribeRequest): Promise<void> {
        if (this.reconnectAttempts >= this.maxReconnectAttempts) {
            console.error("Max reconnection attempts reached");
            return;
        }

        this.reconnectAttempts++;
        console.log(`Reconnecting... Attempt ${this.reconnectAttempts}`);

        setTimeout(async () => {
            try {
                await this.connect(subscribeRequest);
            } catch (error) {
                console.error("Reconnection failed:", error);
                await this.reconnect(subscribeRequest);
            }
        }, this.reconnectInterval * Math.min(this.reconnectAttempts, 5));
    }

    private startPing(): void {
        setInterval(() => {
            if (this.isConnected) {
                this.write({
                    ping: { id: 1 },
                    accounts: {},
                    accountsDataSlice: [],
                    transactions: {},
                    blocks: {},
                    blocksMeta: {},
                    entry: {},
                    slots: {},
                    transactionsStatus: {},
                }).catch(console.error);
            }
        }, 30000);
    }

    private handleData(data: any): void {
        try {
            const processed = this.processBuffers(data);
            this.dataHandler(processed);
        } catch (error) {
            console.error("Error processing data:", error);
        }
    }

    private handleError(error: any): void {
        console.error("Stream error:", error);
        this.isConnected = false;
    }

    private handleDisconnect(subscribeRequest: SubscribeRequest): void {
        console.log("Stream disconnected");
        this.isConnected = false;
        this.reconnect(subscribeRequest);
    }

    private processBuffers(obj: any): any {
        if (!obj) return obj;
        if (Buffer.isBuffer(obj) || obj instanceof Uint8Array) {
            return bs58.default.encode(obj);
        }
        if (Array.isArray(obj)) {
            return obj.map(item => this.processBuffers(item));
        }
        if (typeof obj === 'object') {
            return Object.fromEntries(
                Object.entries(obj).map(([k, v]) => [k, this.processBuffers(v)])
            );
        }
        return obj;
    }
}

// Entry monitoring implementation
async function monitorEntries() {
    const manager = new GrpcStreamManager(
        "your-grpc-url:2053",
        "your-x-token",
        handleEntryUpdate
    );

    const subscribeRequest: SubscribeRequest = {
        entry: {
            entrySubscribe: {}  // Subscribe to all entries
        },
        accounts: {},
        accountsDataSlice: [],
        slots: {},
        blocks: {},
        blocksMeta: {},
        transactions: {},
        transactionsStatus: {},
        commitment: CommitmentLevel.CONFIRMED,
    };

    console.log('Starting entry monitoring...');
    await manager.connect(subscribeRequest);
}

function handleEntryUpdate(data: any): void {
    if (data?.entry) {
        const entry = data.entry;
        console.log('\n=== Entry Details ===');
        console.log(`Slot: ${entry.slot}`);
        
        if (entry.index !== undefined) {
            console.log(`Entry Index: ${entry.index}`);
        }

        if (entry.numHashes !== undefined) {
            console.log(`Number of Hashes: ${entry.numHashes}`);
        }

        if (entry.hash) {
            console.log(`Hash: ${entry.hash}`);
        }

        if (entry.transactions?.length > 0) {
            console.log('\n=== Entry Transactions ===');
            console.log(`Transaction Count: ${entry.transactions.length}`);
            
            entry.transactions.forEach((tx: any, index: number) => {
                console.log(`\nTransaction ${index + 1}:`);
                if (tx.signature) {
                    console.log(`  Signature: ${tx.signature}`);
                }
                if (tx.isVote !== undefined) {
                    console.log(`  Is Vote: ${tx.isVote}`);
                }
            });
        }

        if (entry.tick !== undefined) {
            console.log('\n=== Tick Information ===');
            console.log(`Is Tick: ${entry.tick}`);
        }

        console.log('\n' + '='.repeat(50) + '\n');
    }
}

// Start entry monitoring
monitorEntries().catch(console.error);

Entry Subscription Options

1. Monitor All Entries

const subscribeRequest: SubscribeRequest = {
    entry: {
        allEntries: {}
    },
    commitment: CommitmentLevel.CONFIRMED,
};

2. Monitor Filtered Entries

const subscribeRequest: SubscribeRequest = {
    entry: {
        filteredEntries: {
            filter: {
                voting: false,  // Exclude vote transactions
                failed: false  // Exclude failed transactions
            }
        }
    },
    commitment: CommitmentLevel.CONFIRMED,
};

Data Structures

  1. Entry Update:
    • Slot number (current slot being processed)
    • Entry index (position in the block)
    • Number of hashes (PoH count)
    • Entry hash (unique identifier)
    • Transaction count (number of transactions in entry)
    • Transactions array (detailed transaction information)
  2. Transaction Details:
    • Signature (unique transaction identifier)
    • Vote status (whether it’s a vote transaction)
    • Transaction message
      • Account keys (involved accounts)
      • Header information
      • Instructions (program calls and data)
      • Recent blockhash
    • Transaction metadata
      • Status (success/failure)
      • Fee amount
      • Balance changes
      • Log messages

Rust Implementation

use {
    anyhow::Result, futures::{sink::SinkExt, stream::StreamExt}, hex, log::error, solana_sdk::pubkey::Pubkey, std::{collections::HashMap, sync::Arc, time::Duration}, tokio::sync::Mutex, tonic::{metadata::errors::InvalidMetadataValue, transport::Endpoint}, tonic_health::pb::health_client::HealthClient, yellowstone_grpc_client::{GeyserGrpcClient, InterceptorXToken}, yellowstone_grpc_proto::{
        geyser::{
            geyser_client::GeyserClient, subscribe_update::UpdateOneof, SubscribeRequest, SubscribeRequestFilterBlocks, SubscribeRequestFilterEntry, SubscribeRequestFilterSlots, SubscribeUpdateBlock, SubscribeUpdateEntry, SubscribeUpdateSlot, CommitmentLevel
        },
        prelude::SubscribeRequestPing,
    }
};

/// Manager for handling gRPC stream connections and updates
struct GrpcStreamManager {
    client: GeyserGrpcClient<InterceptorXToken>,
    is_connected: bool,
    reconnect_attempts: u32,
    max_reconnect_attempts: u32,
    reconnect_interval: Duration,
}

impl GrpcStreamManager {
    /// Creates a new GrpcStreamManager instance
    async fn new(endpoint: &str, x_token: &str) -> Result<Arc<Mutex<Self>>> {
        let interceptor = InterceptorXToken {
            x_token: Some(x_token.parse().map_err(|e: InvalidMetadataValue| anyhow::Error::from(e))?),
            x_request_snapshot: true,
        };

        let channel = Endpoint::from_shared(endpoint.to_string())?
            .connect_timeout(Duration::from_secs(10))
            .timeout(Duration::from_secs(10))
            .connect()
            .await
            .map_err(|e| anyhow::Error::from(e))?;

        let client = GeyserGrpcClient::new(
            HealthClient::with_interceptor(channel.clone(), interceptor.clone()),
            GeyserClient::with_interceptor(channel, interceptor),
        );

        Ok(Arc::new(Mutex::new(Self {
            client,
            is_connected: false,
            reconnect_attempts: 0,
            max_reconnect_attempts: 10,
            reconnect_interval: Duration::from_secs(5),
        })))
    }

    /// Handles entry update messages from the gRPC stream
    fn handle_entry_update(&self, entry: &SubscribeUpdateEntry) {
        println!("\n=== Entry Details ===");
        println!("Slot: {}", entry.slot);
        println!("Entry Index: {}", entry.index);
        println!("Number of Hashes: {}", entry.num_hashes);

        if !entry.hash.is_empty() {
            println!("Hash: {}", hex::encode(&entry.hash));
        }

        println!("Executed Transaction Count: {}", entry.executed_transaction_count);
        if entry.starting_transaction_index > 0 {
            println!("Starting Transaction Index: {}", entry.starting_transaction_index);
        }

        println!("\n{}\n", "=".repeat(50));
    }

    /// Establishes connection and handles the subscription stream
    async fn connect(&mut self, request: SubscribeRequest) -> Result<()> {
        let request = request.clone();
        let (mut subscribe_tx, mut stream) = self.client.subscribe_with_request(Some(request.clone())).await?;

        self.is_connected = true;
        self.reconnect_attempts = 0;

        while let Some(message) = stream.next().await {
            match message {
                Ok(msg) => {
                    match msg.update_oneof {
                        Some(UpdateOneof::Entry(entry)) => {
                            self.handle_entry_update(&entry);
                        }
                        Some(UpdateOneof::Ping(_)) => {
                            subscribe_tx
                                .send(SubscribeRequest {
                                    ping: Some(SubscribeRequestPing { id: 1 }),
                                    ..Default::default()
                                })
                                .await?;
                        }
                        Some(UpdateOneof::Pong(_)) => {} // Ignore pong responses
                        _ => {
                            println!("Other update received: {:?}", msg);
                        }
                    }
                }
                Err(err) => {
                    error!("Error: {:?}", err);
                    self.is_connected = false;
                    Box::pin(self.reconnect(request.clone())).await?;
                    break;
                }
            }
        }

        Ok(())
    }

    /// Attempts to reconnect when the connection is lost
    async fn reconnect(&mut self, request: SubscribeRequest) -> Result<()> {
        if self.reconnect_attempts >= self.max_reconnect_attempts {
            println!("Max reconnection attempts reached");
            return Ok(());
        }

        self.reconnect_attempts += 1;
        println!("Reconnecting... Attempt {}", self.reconnect_attempts);

        let backoff = self.reconnect_interval * std::cmp::min(self.reconnect_attempts, 5);
        tokio::time::sleep(backoff).await;

        Box::pin(self.connect(request)).await
    }
}

#[tokio::main]
async fn main() -> Result<()> {
    // Initialize gRPC stream manager
    let manager = GrpcStreamManager::new(
        "your-grpc-url:2053",
        "your-x-token",
    ).await?;

    let mut manager_lock = manager.lock().await;

    // Create subscription request for entries only
    let request = SubscribeRequest {
        entry: HashMap::from_iter(vec![(
            "entry".to_string(),
            SubscribeRequestFilterEntry {},
        )]),
        commitment: Some(CommitmentLevel::Confirmed as i32),
        ..Default::default()
    };

    println!("Starting entry monitoring...");
    
    // Start the subscription
    let result = manager_lock.connect(request).await;
    if let Err(e) = &result {
        println!("Subscription error: {:?}", e);
    }
    result?;

    Ok(())
}

Go Implementation

package main

import (
    "context"
    "crypto/tls"
    "encoding/hex"
    "fmt"
    "log"
    "sync"
    "time"

    pb "github.com/rpcpool/yellowstone-grpc/examples/golang/proto"
    "google.golang.org/grpc"
    "google.golang.org/grpc/credentials"
    "google.golang.org/grpc/metadata"
)

func handleEntryUpdate(entry *pb.SubscribeUpdateEntry) {
    fmt.Printf("Entry Update:\n")
    fmt.Printf("  Slot: %d\n", entry.Slot)
    fmt.Printf("  Index: %d\n", entry.Index)
    fmt.Printf("  Num Hashes: %d\n", entry.NumHashes)
    fmt.Printf("  Hash: %s\n", hex.EncodeToString(entry.Hash))
    fmt.Printf("  Transaction Count: %d\n", entry.ExecutedTransactionCount)

    if entry.Transactions != nil {
        fmt.Printf("Processing %d transactions...\n", len(entry.Transactions))
        for i, tx := range entry.Transactions {
            fmt.Printf("Transaction %d:\n", i+1)
            fmt.Printf("  Signature: %s\n", hex.EncodeToString(tx.Signature))
            fmt.Printf("  Is Vote: %v\n", tx.IsVote)

            if tx.Meta != nil {
                fmt.Printf("  Status: %s\n", getTransactionStatus(tx.Meta.Err))
                fmt.Printf("  Fee: %d\n", tx.Meta.Fee)
                fmt.Printf("  Pre Balances: %v\n", tx.Meta.PreBalances)
                fmt.Printf("  Post Balances: %v\n", tx.Meta.PostBalances)
            }
        }
    }
}

func getTransactionStatus(err interface{}) string {
    if err == nil {
        return "Success"
    }
    return "Failed"
}

func main() {
    ctx := context.Background()

    // Create manager with data handler and x-token
    manager, err := NewGrpcStreamManager(
        "your-dedicated-node-url:2053",
        "x-token",
        handleEntryUpdate,
    )
    if err != nil {
        log.Fatal(err)
    }
    defer manager.Close()

    // Create subscription request for entry monitoring
    entryFilter := &pb.SubscribeRequestFilterEntry{
        Filter: &pb.EntryFilter{
            Voting: false,
            Failed: false,
        },
    }

    req := &pb.SubscribeRequest{
        Entry:      entryFilter,
        Commitment: pb.CommitmentLevel_CONFIRMED,
    }

    // Connect and handle updates
    if err := manager.Connect(ctx, req); err != nil {
        log.Fatal(err)
    }

    // Keep the main goroutine running
    select {}
}