import Client, { CommitmentLevel, SubscribeRequest } from "@triton-one/yellowstone-grpc";
class CNFTStreamManager {
private client: Client;
private stream: any;
private isConnected = false;
private reconnectAttempts = 0;
private readonly maxReconnectAttempts = 10;
private readonly baseReconnectDelay = 1000;
constructor(
private endpoint: string,
private apiKey: string,
private onCNFTEvent: (data: any) => void
) {
this.client = new Client(endpoint, apiKey, {
"grpc.max_receive_message_length": 64 * 1024 * 1024
});
}
async connect(merkleTreeAddress: string): Promise<void> {
try {
console.log(`Connecting to gRPC endpoint: ${this.endpoint}`);
const subscribeRequest: SubscribeRequest = {
commitment: CommitmentLevel.CONFIRMED,
accounts: {
merkleTreeAccount: {
account: [merkleTreeAddress],
owner: [],
filters: []
}
},
accountsDataSlice: [],
transactions: {
cnftTransactions: {
accountInclude: [merkleTreeAddress],
accountExclude: [],
accountRequired: [],
vote: false,
failed: false
}
},
blocks: {},
blocksMeta: {},
entry: {},
slots: {},
transactionsStatus: {},
ping: { id: 1 } // Keep connection alive
};
this.stream = await this.client.subscribe();
this.stream.on("data", (data: any) => {
if (data.account) {
console.log("Merkle tree account update:", data.account);
this.onCNFTEvent({
type: 'account_update',
account: data.account
});
}
if (data.transaction) {
console.log("Transaction involving cNFT:", data.transaction.signature);
this.onCNFTEvent({
type: 'transaction',
transaction: data.transaction
});
}
});
this.stream.on("error", (error: any) => {
console.error("Stream error:", error);
this.handleReconnect();
});
this.stream.on("close", () => {
console.log("Stream closed");
this.isConnected = false;
});
// Send subscription request
await this.writeRequest(subscribeRequest);
this.isConnected = true;
this.reconnectAttempts = 0;
console.log("Successfully connected to gRPC stream");
} catch (error) {
console.error("Failed to connect:", error);
this.handleReconnect();
}
}
private async writeRequest(request: SubscribeRequest): Promise<void> {
return new Promise((resolve, reject) => {
this.stream.write(request, (err: any) => {
if (err) {
reject(err);
} else {
resolve();
}
});
});
}
private async handleReconnect(): Promise<void> {
if (this.reconnectAttempts >= this.maxReconnectAttempts) {
console.error("Max reconnection attempts reached");
return;
}
this.reconnectAttempts++;
const delay = this.baseReconnectDelay * Math.pow(2, this.reconnectAttempts - 1);
console.log(`Reconnecting in ${delay}ms... (attempt ${this.reconnectAttempts})`);
setTimeout(() => {
this.connect("MERKLE_TREE_ADDRESS").catch(console.error);
}, delay);
}
disconnect(): void {
if (this.stream) {
this.stream.end();
}
this.client.close();
this.isConnected = false;
}
}
// Usage example
async function monitorCNFTTree() {
const endpoint = "https://laserstream-mainnet-ewr.helius-rpc.com"; // Choose your region
const apiKey = "your-api-key";
const merkleTreeAddress = "MERKLE_TREE_ADDRESS";
const manager = new CNFTStreamManager(
endpoint,
apiKey,
(eventData) => {
console.log("cNFT Event received:", eventData);
// Process different event types
if (eventData.type === 'account_update') {
console.log("Merkle tree state changed");
// Handle account update - tree root may have changed
} else if (eventData.type === 'transaction') {
console.log("Transaction affecting cNFT detected");
// Parse transaction for mint/transfer/burn operations
}
}
);
await manager.connect(merkleTreeAddress);
// Keep running until interrupted
process.on('SIGINT', () => {
console.log("Shutting down...");
manager.disconnect();
process.exit(0);
});
}
monitorCNFTTree().catch(console.error);