Transaction Monitoring with Yellowstone gRPC

This guide demonstrates how to use Yellowstone gRPC to monitor transactions on Solana. You’ll learn how to track transactions in real-time with filtering options and reliable reconnection handling.

TypeScript Implementation

import Client, { CommitmentLevel, SubscribeRequest } from "@triton-one/yellowstone-grpc";
import * as bs58 from 'bs58';

class GrpcStreamManager {
    private client: Client;
    private stream: any;
    private isConnected: boolean = false;
    private reconnectAttempts: number = 0;
    private readonly maxReconnectAttempts: number = 10;
    private readonly reconnectInterval: number = 5000; // 5 seconds
    private readonly dataHandler: (data: any) => void;

    constructor(
        endpoint: string,
        authToken: string,
        dataHandler: (data: any) => void
    ) {
        this.client = new Client(
            endpoint,
            authToken,
            { "grpc.max_receive_message_length": 64 * 1024 * 1024 }
        );
        this.dataHandler = dataHandler;
    }

    async connect(subscribeRequest: SubscribeRequest): Promise<void> {
        try {
            this.stream = await this.client.subscribe();
            this.isConnected = true;
            this.reconnectAttempts = 0;

            this.stream.on("data", this.handleData.bind(this));
            this.stream.on("error", this.handleError.bind(this));
            this.stream.on("end", () => this.handleDisconnect(subscribeRequest));
            this.stream.on("close", () => this.handleDisconnect(subscribeRequest));

            await this.write(subscribeRequest);
            this.startPing();
        } catch (error) {
            console.error("Connection error:", error);
            await this.reconnect(subscribeRequest);
        }
    }

    private async write(req: SubscribeRequest): Promise<void> {
        return new Promise((resolve, reject) => {
            this.stream.write(req, (err: any) => err ? reject(err) : resolve());
        });
    }

    private async reconnect(subscribeRequest: SubscribeRequest): Promise<void> {
        if (this.reconnectAttempts >= this.maxReconnectAttempts) {
            console.error("Max reconnection attempts reached");
            return;
        }

        this.reconnectAttempts++;
        console.log(`Reconnecting... Attempt ${this.reconnectAttempts}`);

        setTimeout(async () => {
            try {
                await this.connect(subscribeRequest);
            } catch (error) {
                console.error("Reconnection failed:", error);
                await this.reconnect(subscribeRequest);
            }
        }, this.reconnectInterval * Math.min(this.reconnectAttempts, 5));
    }

    private startPing(): void {
        setInterval(() => {
            if (this.isConnected) {
                this.write({
                    ping: { id: 1 },
                    accounts: {},
                    accountsDataSlice: [],
                    transactions: {},
                    blocks: {},
                    blocksMeta: {},
                    entry: {},
                    slots: {},
                    transactionsStatus: {},
                }).catch(console.error);
            }
        }, 30000);
    }

    private handleData(data: any): void {
        try {
            const processed = this.processBuffers(data);
            this.dataHandler(processed);
        } catch (error) {
            console.error("Error processing data:", error);
        }
    }

    private handleError(error: any): void {
        console.error("Stream error:", error);
        this.isConnected = false;
    }

    private handleDisconnect(subscribeRequest: SubscribeRequest): void {
        console.log("Stream disconnected");
        this.isConnected = false;
        this.reconnect(subscribeRequest);
    }

    private processBuffers(obj: any): any {
        if (!obj) return obj;
        if (Buffer.isBuffer(obj) || obj instanceof Uint8Array) {
            return bs58.default.encode(obj);
        }
        if (Array.isArray(obj)) {
            return obj.map(item => this.processBuffers(item));
        }
        if (typeof obj === 'object') {
            return Object.fromEntries(
                Object.entries(obj).map(([k, v]) => [k, this.processBuffers(v)])
            );
        }
        return obj;
    }
}

// Transaction monitoring implementation
async function monitorTransactions() {
    const manager = new GrpcStreamManager(
        "your-grpc-url:2053",
        "your-x-token",
        handleTransactionUpdate
    );

    // Create subscription request for monitoring program transactions
    const subscribeRequest: SubscribeRequest = {
        transactions: {
            client: {
                accountInclude: ["TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA"],
                accountExclude: [],
                accountRequired: [],
                vote: false,
                failed: false
            }
        },
        commitment: CommitmentLevel.CONFIRMED,
        accounts: {},
        accountsDataSlice: [],
        blocks: {},
        blocksMeta: {},
        entry: {},
        slots: {},
        transactionsStatus: {}
    };

    await manager.connect(subscribeRequest);
}

function handleTransactionUpdate(data: any): void {
    // Check if we have transaction data in the correct structure
    if (data?.transaction?.transaction) {
        const txInfo = data.transaction.transaction;
        console.log('\n=== Transaction Details ===');
        console.log(`Signature: ${txInfo.signature}`);
        console.log(`Slot: ${data.transaction.slot}`);
        console.log(`Is Vote: ${txInfo.isVote}`);
        
        if (txInfo.meta) {
            console.log(`Fee: ${txInfo.meta.fee} lamports`);
            console.log(`Compute Units: ${txInfo.meta.computeUnitsConsumed}`);
            console.log(`Status: ${txInfo.meta.err ? 'Failed' : 'Success'}`);

            // Token balance changes
            if (txInfo.meta.preTokenBalances?.length > 0) {
                console.log('\n=== Token Balance Changes ===');
                txInfo.meta.preTokenBalances.forEach((preBalance: any, index: number) => {
                    const postBalance = txInfo.meta.postTokenBalances[index];
                    if (preBalance && postBalance) {
                        console.log(`Token Account: ${preBalance.mint}`);
                        console.log(`  Pre Balance: ${preBalance.uiTokenAmount?.uiAmount}`);
                        console.log(`  Post Balance: ${postBalance.uiTokenAmount?.uiAmount}`);
                        if (preBalance.owner) {
                            console.log(`  Owner: ${preBalance.owner}`);
                        }
                    }
                });
            }

            // Log messages
            if (txInfo.meta.logMessages?.length > 0) {
                console.log('\n=== Program Logs ===');
                txInfo.meta.logMessages.forEach((log: string) => {
                    console.log(`  ${log}`);
                });
            }
        }

        // Transaction instructions
        if (txInfo.transaction?.message) {
            const message = txInfo.transaction.message;
            
            console.log('\n=== Account Keys ===');
            if (Array.isArray(message.accountKeys)) {
                message.accountKeys.forEach((key: Uint8Array, index: number) => {
                    try {
                        console.log(`  ${index}: ${bs58.default.encode(key)}`);
                    } catch (error) {
                        console.log(`  ${index}: [Unable to encode key]`);
                    }
                });
            }

            // Instructions
            // TODO: Add instructions processing logic
            if (Array.isArray(message.instructions)) {
                console.log('\n=== Instructions ===');
                message.instructions.forEach((ix: any, index: number) => {
                    try {
                        console.log(`\nInstruction ${index + 1}:`);
                        console.log(`  Program: ${bs58.default.encode(message.accountKeys[ix.programIdIndex])}`);
                        console.log(`  Accounts: ${ix.accounts.map((idx: number) => 
                            bs58.default.encode(message.accountKeys[idx])
                        )}`);
                        if (ix.data) {
                            console.log(`  Data: ${bs58.default.encode(ix.data)}`);
                        }
                    } catch (error) {
                        console.log(`  [Unable to decode instruction ${index + 1}]`);
                    }
                });
            }

            // Process inner instructions
            // TODO: Add inner instructions processing logic
            if (txInfo.meta?.innerInstructions?.length > 0) {
                console.log('\n=== Inner Instructions ===');
                txInfo.meta.innerInstructions.forEach((inner: any, index: number) => {
                    console.log(`\nInner Instruction Set ${index + 1}:`);
                    inner.instructions.forEach((ix: any, i: number) => {
                        try {
                            console.log(`  Instruction ${i + 1}:`);
                            console.log(`    Program: ${bs58.default.encode(message.accountKeys[ix.programIdIndex])}`);
                            console.log(`    Accounts: ${ix.accounts.map((idx: number) => 
                                bs58.default.encode(message.accountKeys[idx])
                            )}`);
                            if (ix.data) {
                                console.log(`    Data: ${bs58.default.encode(ix.data)}`);
                            }
                        } catch (error) {
                            console.log(`    [Unable to decode instruction]`);
                        }
                    });
                });
            }
        }

        console.log('\n' + '='.repeat(50) + '\n');
    }
}

// Start monitoring
monitorTransactions().catch(console.error);

Transaction Subscription Options

1. Monitor Program Transactions

const subscribeRequest: SubscribeRequest = {
    transactions: {
        accountInclude: ["TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA"], // Token program
        accountExclude: [],
        accountRequired: [],
        vote: false,
        failed: false,
        signature: undefined
    },
    commitment: CommitmentLevel.CONFIRMED,
};

2. Monitor Multiple Programs

const subscribeRequest: SubscribeRequest = {
    transactions: {
        accountInclude: [
            "TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA", // Token program
            "ATokenGPvbdGVxr1b2hvZbsiqW5xWH25efTNsLJA8knL", // Associated Token program
        ],
        accountExclude: [],
        accountRequired: [],
        vote: false,
        failed: true,
        signature: undefined
    },
    commitment: CommitmentLevel.CONFIRMED,
};

3. Monitor All Transactions

const subscribeRequest: SubscribeRequest = {
    transactions: {
        accountInclude: [],
        accountExclude: [],
        accountRequired: [],
        vote: true,
        failed: true,
        signature: undefined
    },
    commitment: CommitmentLevel.CONFIRMED,
};

Data Structures

  1. Transaction Update:
    • Slot number (when transaction was processed)
    • Transaction signature (unique identifier)
    • Transaction message
      • Account keys (involved accounts)
      • Instructions (program calls and data)
      • Recent blockhash
    • Transaction metadata
      • Status (success/failure)
      • Fee amount
      • Compute units consumed
      • Log messages
      • Inner instructions
      • Balance changes
  2. Instruction Details:
    • Program ID (program being called)
    • Account list (accounts used by instruction)
    • Instruction data (program-specific data)
    • Inner instructions (CPI calls)

Rust Implementation

use {
    anyhow::Result,
    futures::{stream::StreamExt, sink::SinkExt},
    log::error,
    std::{collections::HashMap, sync::Arc, time::Duration},
    tokio::sync::Mutex,
    tonic::{metadata::errors::InvalidMetadataValue, transport::Endpoint},
    yellowstone_grpc_client::{GeyserGrpcClient, InterceptorXToken},
    yellowstone_grpc_proto::{
        geyser::{
            geyser_client::GeyserClient,
            SubscribeRequest,
            SubscribeRequestFilterTransactions,
            subscribe_update::UpdateOneof,
            SubscribeUpdateTransaction,
        },
        prelude::{CommitmentLevel, SubscribeRequestPing},
    },
    tonic_health::pb::health_client::HealthClient,
    solana_sdk::pubkey::Pubkey,
    hex,
    tokio::sync::mpsc,
};

/// Manager for handling gRPC stream connections and transaction updates
struct GrpcStreamManager {
    client: GeyserGrpcClient<InterceptorXToken>,
    is_connected: bool,
    reconnect_attempts: u32,
    max_reconnect_attempts: u32,
    reconnect_interval: Duration,
}

impl GrpcStreamManager {
    /// Handles transaction update messages from the gRPC stream
    /// This function can be customized based on your requirements:
    /// - Store transactions in a database
    /// - Trigger specific actions based on transaction contents
    /// - Filter for specific types of transactions
    /// - Transform data into your required format
    /// 
    /// # Arguments
    /// * `transaction_update` - The transaction update containing all details
    fn handle_transaction_update(&self, transaction_update: &SubscribeUpdateTransaction) {
        if let Some(transaction) = &transaction_update.transaction {
            println!("Transaction Update:");
            println!("  Signature: {}", hex::encode(&transaction.signature));
            
            if let Some(meta) = &transaction.meta {
                println!("  Status: {}", if meta.err.is_none() { "Success" } else { "Failed" });
                println!("  Fee: {:?}", meta.fee);
                println!("  Compute Units: {:?}", meta.compute_units_consumed);
            }

            if let Some(transaction_message) = &transaction.transaction {
                if let Some(message) = &transaction_message.message {
                    println!("Account Keys:");
                    for (i, key) in message.account_keys.iter().enumerate() {
                        if let Ok(pubkey) = Pubkey::try_from(key.as_slice()) {
                            println!("  {}: {}", i, pubkey);
                        } else {
                            println!("  {}: {}", i, hex::encode(key));
                        }
                    }

                    println!("Instructions:");
                    for (i, instruction) in message.instructions.iter().enumerate() {
                        println!("Instruction {}:", i + 1);
                        if let Some(program_key) = message.account_keys.get(instruction.program_id_index as usize) {
                            if let Ok(pubkey) = Pubkey::try_from(program_key.as_slice()) {
                                println!("  Program: {}", pubkey);
                            } else {
                                println!("  Program: {}", hex::encode(program_key));
                            }
                        }
                        
                        println!("  Accounts:");
                        for &account_idx in &instruction.accounts {
                            if let Some(key) = message.account_keys.get(account_idx as usize) {
                                if let Ok(pubkey) = Pubkey::try_from(key.as_slice()) {
                                    println!("    {}", pubkey);
                                } else {
                                    println!("    {}", hex::encode(key));
                                }
                            }
                        }
                        println!("  Data: {}", hex::encode(&instruction.data));
                    }

                    if let Some(meta) = &transaction.meta {
                        if !meta.log_messages.is_empty() {
                            println!("Log Messages:");
                            for log in &meta.log_messages {
                                println!("  {}", log);
                            }
                        }

                        println!("Balance Changes:");
                        for i in 0..meta.pre_balances.len().min(meta.post_balances.len()) {
                            let pre = meta.pre_balances[i];
                            let post = meta.post_balances[i];
                            if pre != post {
                                if let Some(key) = message.account_keys.get(i) {
                                    if let Ok(pubkey) = Pubkey::try_from(key.as_slice()) {
                                        println!("  Account {}: {} -> {} (Δ{})",
                                            pubkey,
                                            pre, post, post.saturating_sub(pre));
                                    } else {
                                        println!("  Account {}: {} -> {} (Δ{})",
                                            hex::encode(key),
                                            pre, post, post.saturating_sub(pre));
                                    }
                                }
                            }
                        }
                    }
                }
            }
        }
    }

    /// Creates a new GrpcStreamManager instance
    /// 
    /// # Arguments
    /// * `endpoint` - The gRPC endpoint URL
    /// * `x_token` - Authentication token for the endpoint
    async fn new(endpoint: &str, x_token: &str) -> Result<Arc<Mutex<GrpcStreamManager>>> {
        let interceptor = InterceptorXToken {
            x_token: Some(x_token.parse().map_err(|e: InvalidMetadataValue| anyhow::Error::from(e))?),
            x_request_snapshot: true,
        };

        let channel = Endpoint::from_shared(endpoint.to_string())?
            .connect_timeout(Duration::from_secs(10))
            .timeout(Duration::from_secs(10))
            .connect()
            .await
            .map_err(|e| anyhow::Error::from(e))?;

        let client = GeyserGrpcClient::new(
            HealthClient::with_interceptor(channel.clone(), interceptor.clone()),
            GeyserClient::with_interceptor(channel, interceptor),
        );

        Ok(Arc::new(Mutex::new(GrpcStreamManager {
            client,
            is_connected: false,
            reconnect_attempts: 0,
            max_reconnect_attempts: 10,
            reconnect_interval: Duration::from_secs(5),
        })))
    }

    /// Establishes connection and handles the subscription stream
    /// 
    /// # Arguments
    /// * `request` - The subscription request containing transaction filters and other parameters
    async fn connect(&mut self, request: SubscribeRequest) -> Result<()> {
        let request = request.clone();
        let (mut subscribe_tx, mut stream) = self.client.subscribe_with_request(Some(request.clone())).await?;

        self.is_connected = true;
        self.reconnect_attempts = 0;

        // Create a channel for sending ping requests
        let (ping_sender, mut ping_receiver) = mpsc::channel(10);
        
        // Add client-initiated ping mechanism
        let ping_handle = tokio::spawn(async move {
            let mut interval = tokio::time::interval(Duration::from_secs(30));
            loop {
                interval.tick().await;
                if let Err(e) = ping_sender.send(()).await {
                    error!("Failed to send ping signal: {:?}", e);
                    break;
                }
            }
        });

        // Process both stream messages and ping requests
        loop {
            tokio::select! {
                Some(message) = stream.next() => {
                    match message {
                        Ok(msg) => {
                            match msg.update_oneof {
                                Some(UpdateOneof::Transaction(transaction)) => {
                                    self.handle_transaction_update(&transaction);
                                }
                                Some(UpdateOneof::Ping(_)) => {
                                    subscribe_tx
                                        .send(SubscribeRequest {
                                            ping: Some(SubscribeRequestPing { id: 1 }),
                                            ..Default::default()
                                        })
                                        .await?;
                                }
                                Some(UpdateOneof::Pong(_)) => {} // Ignore pong responses
                                _ => {
                                    println!("Other update received: {:?}", msg);
                                }
                            }
                        }
                        Err(err) => {
                            error!("Error: {:?}", err);
                            self.is_connected = false;
                            ping_handle.abort(); // Cleanup ping task
                            Box::pin(self.reconnect(request.clone())).await?;
                            return Ok(());
                        }
                    }
                }
                Some(_) = ping_receiver.recv() => {
                    // Send ping when requested by the ping task
                    if let Err(e) = subscribe_tx
                        .send(SubscribeRequest {
                            ping: Some(SubscribeRequestPing { id: 1 }),
                            ..Default::default()
                        })
                        .await
                    {
                        error!("Failed to send ping: {:?}", e);
                        break;
                    }
                }
                else => break,
            }
        }

        ping_handle.abort(); // Cleanup ping task
        Ok(())
    }

    /// Attempts to reconnect when the connection is lost
    /// 
    /// # Arguments
    /// * `request` - The original subscription request to reestablish the connection
    async fn reconnect(&mut self, request: SubscribeRequest) -> Result<()> {
        if self.reconnect_attempts >= self.max_reconnect_attempts {
            println!("Max reconnection attempts reached");
            return Ok(());
        }

        self.reconnect_attempts += 1;
        println!("Reconnecting... Attempt {}", self.reconnect_attempts);

        let backoff = self.reconnect_interval * std::cmp::min(self.reconnect_attempts, 5);
        tokio::time::sleep(backoff).await;

        Box::pin(self.connect(request)).await
    }
}

#[tokio::main]
async fn main() -> Result<()> {
    // Initialize gRPC stream manager
    let manager = GrpcStreamManager::new(
        "your-grpc-url:2053",
        "your-x-token",
    ).await?;

    let mut manager_lock = manager.lock().await;

    // Create subscription request for token program transactions
    let request = SubscribeRequest {
        transactions: HashMap::from_iter(vec![(
            "transactions".to_string(),
            SubscribeRequestFilterTransactions {
                vote: Some(false),
                failed: Some(false),
                signature: None,
                account_include: vec!["TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA".to_string()],
                account_exclude: vec![],
                account_required: vec![],
            },
        )]),
        commitment: Some(CommitmentLevel::Confirmed as i32),
        ..Default::default()
    };

    println!("Starting subscription for Token Program transactions");
    
    // Start the subscription
    let result = manager_lock.connect(request).await;
    if let Err(e) = &result {
        println!("Subscription error: {:?}", e);
    }
    result?;

    Ok(())
}

Go Implementation

package main

import (
	"context"
	"crypto/tls"
	"fmt"
	"log"
	"sync"
	"time"

	"github.com/mr-tron/base58"
	pb "github.com/rpcpool/yellowstone-grpc/examples/golang/proto"
	"google.golang.org/grpc"
	"google.golang.org/grpc/credentials"
	"google.golang.org/grpc/metadata"
)

type GrpcStreamManager struct {
	mu                   sync.Mutex
	conn                 *grpc.ClientConn
	client               pb.GeyserClient
	stream               pb.Geyser_SubscribeClient
	isConnected          bool
	reconnectAttempts    int
	maxReconnectAttempts int
	reconnectInterval    time.Duration
	dataHandler          func(*pb.SubscribeUpdate)
	xToken               string
}

func NewGrpcStreamManager(endpoint string, xToken string, dataHandler func(*pb.SubscribeUpdate)) (*GrpcStreamManager, error) {
	// Create gRPC connection with interceptor for x-token
	ctx := metadata.NewOutgoingContext(
		context.Background(),
		metadata.New(map[string]string{"x-token": xToken}),
	)

	// Configure TLS
	config := &tls.Config{
		InsecureSkipVerify: true,
	}

	conn, err := grpc.DialContext(ctx, endpoint,
		grpc.WithTransportCredentials(credentials.NewTLS(config)),
		grpc.WithInitialWindowSize(1<<30),
		grpc.WithInitialConnWindowSize(1<<30),
		grpc.WithDefaultCallOptions(
			grpc.MaxCallSendMsgSize(64*1024*1024),
			grpc.MaxCallRecvMsgSize(64*1024*1024),
		),
	)
	if err != nil {
		return nil, fmt.Errorf("failed to connect: %v", err)
	}

	return &GrpcStreamManager{
		conn:                 conn,
		client:               pb.NewGeyserClient(conn),
		isConnected:          false,
		reconnectAttempts:    0,
		maxReconnectAttempts: 10,
		reconnectInterval:    5 * time.Second,
		dataHandler:          dataHandler,
		xToken:               xToken,
	}, nil
}

func (m *GrpcStreamManager) Connect(ctx context.Context, req *pb.SubscribeRequest) error {
	m.mu.Lock()
	defer m.mu.Unlock()

	log.Println("Attempting to connect...")

	// Add x-token to context
	ctx = metadata.NewOutgoingContext(
		ctx,
		metadata.New(map[string]string{"x-token": m.xToken}),
	)

	stream, err := m.client.Subscribe(ctx)
	if err != nil {
		log.Printf("Failed to subscribe: %v", err)
		return m.reconnect(ctx, req)
	}

	if err := stream.Send(req); err != nil {
		log.Printf("Failed to send request: %v", err)
		return m.reconnect(ctx, req)
	}

	m.stream = stream
	m.isConnected = true
	m.reconnectAttempts = 0
	log.Println("Connection established")

	// Start ping goroutine
	go func() {
		ticker := time.NewTicker(30 * time.Second)
		defer ticker.Stop()

		for {
			select {
			case <-ctx.Done():
				return
			case <-ticker.C:
				if !m.isConnected {
					return
				}
				pingReq := &pb.SubscribeRequest{
					Ping: &pb.SubscribeRequestPing{Id: 1},
				}
				if err := m.stream.Send(pingReq); err != nil {
					log.Printf("Ping failed: %v", err)
					m.handleDisconnect(ctx, req)
					return
				}
			}
		}
	}()

	// Process updates
	go func() {
		for {
			update, err := m.stream.Recv()
			if err != nil {
				log.Printf("Stream error: %v", err)
				m.handleDisconnect(ctx, req)
				return
			}

			m.dataHandler(update)
		}
	}()

	return nil
}

func (m *GrpcStreamManager) reconnect(ctx context.Context, req *pb.SubscribeRequest) error {
	if m.reconnectAttempts >= m.maxReconnectAttempts {
		return fmt.Errorf("max reconnection attempts reached")
	}

	m.reconnectAttempts++
	log.Printf("Reconnecting... Attempt %d", m.reconnectAttempts)

	backoff := m.reconnectInterval * time.Duration(min(m.reconnectAttempts, 5))
	time.Sleep(backoff)

	return m.Connect(ctx, req)
}

func (m *GrpcStreamManager) handleDisconnect(ctx context.Context, req *pb.SubscribeRequest) {
	m.mu.Lock()
	defer m.mu.Unlock()

	if !m.isConnected {
		return
	}

	m.isConnected = false
	if err := m.reconnect(ctx, req); err != nil {
		log.Printf("Failed to reconnect: %v", err)
	}
}

func (m *GrpcStreamManager) Close() error {
	m.mu.Lock()
	defer m.mu.Unlock()

	m.isConnected = false
	if m.conn != nil {
		return m.conn.Close()
	}
	return nil
}

func min(a, b int) int {
	if a < b {
		return a
	}
	return b
}

func handleAccountUpdate(update *pb.SubscribeUpdate) {
	if tx := update.GetTransaction(); tx != nil {
		log.Printf("Transaction Update:\n")
		log.Printf("  Slot: %d\n", tx.GetSlot())

		txInfo := tx.GetTransaction()
		if txInfo != nil {
			// Print signature if available
			if len(txInfo.GetSignature()) > 0 {
				log.Printf("  Signature: %s\n", base58.Encode(txInfo.GetSignature()))
			}

			// Print if it's a vote transaction
			log.Printf("  Is Vote: %v\n", txInfo.GetIsVote())

			// Print transaction index
			log.Printf("  Transaction Index: %d\n", txInfo.GetIndex())

			// Print transaction details
			if tx := txInfo.GetTransaction(); tx != nil {
				if msg := tx.GetMessage(); msg != nil {
					if accounts := msg.GetAccountKeys(); len(accounts) > 0 {
						log.Printf("  Account Keys:\n")
						for _, acc := range accounts {
							if len(acc) > 0 {
								log.Printf("    - %s\n", base58.Encode(acc))
							}
						}
					}
				}
			}

			// Print status and metadata
			if meta := txInfo.GetMeta(); meta != nil {
				log.Printf("  Status: %v\n", meta.GetErr() == nil)
				log.Printf("  Fee: %d\n", meta.GetFee())

				// Print log messages if any
				if len(meta.GetLogMessages()) > 0 {
					log.Printf("  Log Messages:\n")
					for _, msg := range meta.GetLogMessages() {
						log.Printf("    - %s\n", msg)
					}
				}
			}
		}
	}
}

func boolPtr(b bool) *bool {
	return &b
}

func main() {
	ctx := context.Background()

	// Create manager with data handler and x-token
	manager, err := NewGrpcStreamManager(
		"your-grpc-url:2053",
        	"your-x-token",
		handleAccountUpdate,
	)
	if err != nil {
		log.Fatal(err)
	}
	defer manager.Close()

	// Create subscription request for token program transactions
	transactions := make(map[string]*pb.SubscribeRequestFilterTransactions)
	transactions["transactions"] = &pb.SubscribeRequestFilterTransactions{
		Vote:            boolPtr(false),
		Failed:          boolPtr(false),
		AccountInclude:  []string{"TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA"},
		AccountExclude:  []string{},
		AccountRequired: []string{},
	}

	commitment := pb.CommitmentLevel_CONFIRMED
	req := &pb.SubscribeRequest{
		Transactions: transactions,
		Commitment:   &commitment,
	}

	// Connect and handle updates
	if err := manager.Connect(ctx, req); err != nil {
		log.Fatal(err)
	}

	// Keep the main goroutine running
	select {}
}