The Problem with Waiting

Large language models are slow. A typical response takes seconds — sometimes tens of seconds — to generate. During that time, the user stares at an empty chat window, wondering if anything is happening at all.

Every major LLM chat interface solved this the same way: stream tokens as they arrive. ChatGPT, Claude, Gemini — they all render partial text while the model is still thinking. The experience feels responsive even when the full response takes 15 seconds.

But what about Telegram bots? The standard sendMessage API waits until you have the complete text, then delivers it in one shot. For short replies, this is fine. For LLM-generated answers that span multiple paragraphs, it creates an uncomfortable pause where the user has no feedback at all.

Telegram’s sendMessageDraft API changes this. It lets you push partial text to a chat while still generating the full response — the same streaming UX that users expect from any modern AI interface, but inside Telegram.

This article walks through how to build a fully reactive streaming pipeline: from LLM token generation through NDJSON over HTTP to progressive Telegram draft updates. The code comes from a production system where a RAG-powered assistant serves users through Telegram, built on the reactive bot framework and the architectural patterns covered in the reactive architecture series.


Architecture Overview

The streaming pipeline spans three services connected through Apache Pulsar:

      sequenceDiagram
    participant U as Telegram User
    participant TP as Telegram Poller
    participant P as Pulsar
    participant TW as Telegram Worker
    participant CS as Chat Service
    participant LLM as LLM

    U->>TP: sends message
    TP->>P: publish update
    P->>TW: deliver update
    TW->>CS: POST /chat<br/>Accept: application/x-ndjson
    CS->>LLM: streaming chat request

    loop Token streaming
        LLM-->>CS: partial token
        CS-->>TW: NDJSON line
        TW->>TW: buffer(2s) + accumulate
        TW->>P: publish draft reply
        P->>TP: deliver reply
        TP->>U: sendMessageDraft(draftId, accumulated)
    end

    Note over TW: stream complete
    TW->>P: publish final reply
    P->>TP: deliver final reply
    TP->>U: sendMessage(full text + keyboard)
    

Each service is decoupled through Pulsar topics, which means the poller — the only component that holds the Telegram bot token — never calls the LLM directly. The worker handles business logic and streaming. The chat service owns the LLM interaction.

This separation matters: the poller scales horizontally by bot count, the worker scales by message volume, and the chat service scales by LLM capacity. Each can be tuned independently.


Streaming Tokens from the LLM

LangChain4j TokenStream

LangChain4j provides two ways to interact with a chat model: synchronous chat() that blocks until the full response is ready, and streaming chatStream() that returns a TokenStream. The streaming variant is what makes progressive delivery possible.

TokenStream is a callback-driven API. You register three handlers and then call .start() to begin generation:

  • onPartialResponse(String token) — called for every token fragment the model produces. A single word like “architecture” might arrive as one token, or a longer sentence might arrive as multiple fragments. The callback fires as soon as each token is available from the model’s output stream.
  • onCompleteResponse(ChatResponse response) — called exactly once after the last token. The ChatResponse contains the full AiMessage with the assembled text, plus a TokenUsage object with input and output token counts. This is the place to record billing metrics or persist the complete response.
  • onError(Throwable error) — called if the model request fails, the connection drops, or tool execution throws an exception during streaming.

The interface is defined declaratively using LangChain4j’s AiServices proxy pattern:

public interface ChatAssistant {

    TokenStream chatStream(@MemoryId String memoryId,
                           @UserMessage UserMessage userMessage);
}

At runtime, AiServices.builder(ChatAssistant.class) generates an implementation that routes chatStream() calls to the configured StreamingChatModel. The @MemoryId annotation maps each conversation to its own chat memory, so the model sees the correct history. The @UserMessage annotation tells LangChain4j which parameter carries the user’s input.

The StreamingChatModel connects to the LLM provider’s streaming API — OpenAI’s /v1/chat/completions with stream: true, or any compatible endpoint:

StreamingChatModel streamingModel = OpenAiStreamingChatModel.builder()
        .baseUrl(llmUrl)
        .modelName(modelName)
        .apiKey(apiKey)
        .temperature(0.0)
        .build();

ChatAssistant assistant = AiServices.builder(ChatAssistant.class)
        .streamingChatModel(streamingModel)
        .chatMemoryProvider(memoryProvider)
        .contentRetriever(ragRetriever)
        .build();

Bridging TokenStream to Flux

TokenStream is callback-based. Project Reactor is demand-based. Flux.create bridges the two:

@PostMapping(produces = {
        MediaType.APPLICATION_JSON_VALUE,
        MediaType.APPLICATION_NDJSON_VALUE,
})
public Flux<String> chat(@RequestBody Message message) {

    TokenStream tokenStream = assistant.chatStream(
            message.conversationId(),
            UserMessage.builder()
                       .addContent(TextContent.from(message.getText()))
                       .build()
    );

    return Flux.<String>create(sink -> {
        tokenStream.onPartialResponse(sink::next)
                   .onCompleteResponse(response -> {
                       TokenUsage usage = response.tokenUsage();
                       if (usage != null) {
                           log.info("tokens: in={}, out={}",
                                    usage.inputTokenCount(),
                                    usage.outputTokenCount());
                       }
                       sink.complete();
                   })
                   .onError(sink::error)
                   .start();
    });
}

Each onPartialResponse callback pushes a token into the sink. onCompleteResponse logs token usage and completes the stream. .start() is what actually triggers the HTTP request to the LLM — nothing happens until it is called.

One subtlety: TokenStream does not respect back-pressure. Once .start() is called, tokens arrive at whatever rate the model produces them. Flux.create handles this with an internal unbounded buffer — tokens accumulate if downstream is slow. For LLM outputs this is fine — the total response is typically kilobytes, not gigabytes. But it means you should not use Flux.push (which requires single-threaded emission) since onPartialResponse may fire from the LLM client’s I/O thread.

When a client sends Accept: application/x-ndjson, Spring WebFlux automatically serializes each element as a separate JSON line, flushing it to the HTTP response immediately. For a deeper look at how NDJSON streaming works with Spring WebFlux — including bodyToFlux consumption, bidirectional streaming, back-pressure propagation, and the differences from blocking JSON endpoints — see Streaming Data with Spring WebFlux and NDJSON.


Consuming the Token Stream Over HTTP

For a backend-to-backend pipeline where both sides use Spring WebFlux, NDJSON is the simpler choice over SSE. The worker consumes the chat service’s streaming response with bodyToFlux:

public Flux<String> sendMessage(String conversationId, String text) {
    return webClient.post()
            .uri(chatServiceUrl, uriBuilder -> uriBuilder.path("/chat").build())
            .contentType(MediaType.APPLICATION_JSON)
            .accept(MediaType.APPLICATION_NDJSON)
            .bodyValue(new Message(conversationId, text))
            .retrieve()
            .bodyToFlux(String.class);
}

bodyToFlux(String.class) reads each NDJSON line from the chunked HTTP response, deserializes it, and emits it downstream — all while maintaining back-pressure through Reactor’s demand signaling. No custom SSE parsers, no BodyExtractors, no data: prefix stripping. For a deeper explanation of NDJSON streaming patterns — including bidirectional streaming, .body(flux, T.class) vs .bodyValue(), and Netty configuration for long-lived streams — see Streaming Data with Spring WebFlux and NDJSON.


The sendMessageDraft API

Telegram’s sendMessageDraft method is designed exactly for this use case. It accepts a draft_id — an arbitrary integer that identifies a draft conversation — and progressively updates the displayed text in the user’s chat.

The reactive client wraps it as a non-blocking Mono:

public Mono<Response<Boolean>> sendMessageDraft(
        long chatId,
        @Nullable Integer messageThreadId,
        int draftId,
        String text,
        @Nullable ParseMode parseMode,
        @Nullable List<MessageEntity> entities) {

    BodyInserters.FormInserter<String> inserter =
            BodyInserters.fromFormData("chat_id", String.valueOf(chatId))
                         .with("draft_id", String.valueOf(draftId))
                         .with("text", text);

    if (messageThreadId != null) {
        inserter = inserter.with("message_thread_id", String.valueOf(messageThreadId));
    }
    if (parseMode != null) {
        inserter = inserter.with("parse_mode", parseMode.name());
    }
    if (entities != null) {
        inserter = inserter.with("entities", toJson(entities));
    }

    return webClient.post()
            .uri(endpoint + "/bot" + token,
                 uriBuilder -> uriBuilder.pathSegment("sendMessageDraft").build())
            .contentType(MediaType.MULTIPART_FORM_DATA)
            .body(inserter)
            .exchangeToMono(clientResponse ->
                    extractResponse(clientResponse,
                            new ParameterizedTypeReference<Response<Boolean>>() {}));
}

The semantics are straightforward: each call with the same draft_id replaces the previous draft text. You accumulate tokens on your side and send the growing text. Telegram updates the bubble in-place — no flicker, no new messages, just the text expanding progressively.

When the LLM finishes, you send a final sendMessage call with the complete text and any reply keyboard. This replaces the draft with a permanent message.

Throughput Limits

Telegram enforces strict rate limits on bot API calls. The global cap is around 30 requests per second across all chats, and individual chats are limited to roughly 1 message per second. sendMessageDraft counts against these same limits — each draft update is a full API call. Exceeding the limit returns a 429 Too Many Requests error with a retry_after field indicating how long to wait.

This means you cannot simply fire a draft update for every token. At 30+ tokens per second from the LLM, you would burn through the per-chat limit instantly and hit the global cap within a few concurrent conversations. The buffer strategy in the next section is not optional — it is a hard requirement imposed by Telegram’s rate limiting.


Buffering and Accumulating Tokens

As described above, Telegram’s rate limits make it impossible to send a draft update for every token. An LLM emitting 30+ tokens per second would exceed the per-chat limit instantly and exhaust the global 30 RPS budget within a handful of concurrent conversations. The solution is time-based buffering that collapses many tokens into a single API call:

int draftId = RandomUtils.secure().randomInt();
AtomicReference<StringBuilder> accumulated = new AtomicReference<>(new StringBuilder());

return chatClient.sendMessage(conversationId, msgText)
        .buffer(Duration.ofSeconds(2))
        .concatMap(parts -> {
            StringBuilder builder = accumulated.get();
            builder.append(String.join("", parts));

            return telegramClient.sendMessageDraft(
                    chatId, null, draftId,
                    builder.toString(),
                    ParseMode.Markdown, null);
        })
        .then(Mono.fromSupplier(() -> accumulated.get().toString()))
        .flatMap(response -> {
            return telegramClient.sendMessage(
                    chatId, response,
                    ParseMode.Markdown, replyKeyboard);
        });

There are three important operators working together here:

.buffer(Duration.ofSeconds(2)) — collects all tokens that arrive within a 2-second window into a List<String>. If the LLM emits 60 tokens in 2 seconds, they arrive as a single batch. This collapses 60 potential API calls into 1, keeping the per-chat rate well under Telegram’s limit.

.concatMap() — processes each batch sequentially. This is critical: flatMap would allow concurrent draft updates, which could arrive at Telegram out of order. concatMap guarantees that draft n completes before draft n+1 starts.

AtomicReference<StringBuilder> — accumulates the full text across all batches. Each draft update sends the entire accumulated text, not just the new tokens. This matches sendMessageDraft semantics — each call replaces the full draft content.

The draftId is generated once per conversation turn using RandomUtils.secure().randomInt(). This integer links all progressive updates to the same draft bubble in the Telegram UI.

The 2-second buffer window is a practical sweet spot. Shorter windows (500ms) would produce more frequent updates — smoother visually, but risk hitting rate limits under concurrent load. Longer windows (5s) are safer but make the streaming feel sluggish. Two seconds gives the user noticeable progress every tick while staying comfortably within Telegram’s throughput budget, even with dozens of concurrent streaming conversations.


The Outbound Message Pipeline

Draft updates and final messages both flow through Pulsar before reaching the Telegram API. This decoupling allows the worker to fire-and-forget reply actions while the poller handles delivery:

public Mono<MessageId> sendDraftReply(long chatId, int draftId,
                                       String message, ParseMode parseMode) {
    BotReply reply = new BotReply.Draft(chatId, draftId, message, parseMode);

    return outboundChannel.publish(reply);
}

On the poller side, the outbound handler dispatches each reply through pattern matching on Java’s sealed types:

private Mono<Response> dispatch(BotReply reply) {
    return switch (reply) {
        case BotReply.Message r ->
                botClient.sendMessage(r.chatId(), r.text(),
                                      r.parseMode(), r.replyMarkup())
                    .cast(Response.class);

        case BotReply.Draft r ->
                botClient.sendMessageDraft(r.chatId(), null,
                                           r.draftId(), r.text(),
                                           r.parseMode(), null)
                    .cast(Response.class);

        // ... other reply types
    };
}

The sealed BotReply hierarchy makes this dispatch exhaustive at compile time. Adding a new reply type without handling it in the switch expression produces a compiler error — not a runtime surprise.


Back-Pressure Across the Pipeline

One important property of this architecture: back-pressure propagates end-to-end through reactive streams. If the Telegram API is slow to accept draft updates, the chain reacts automatically:

      graph LR
    LLM["LLM Token Stream"] -->|onPartialResponse| FC["Flux.create sink"]
    FC -->|NDJSON| WC["WebClient bodyToFlux"]
    WC -->|buffer 2s| BF["Buffer Operator"]
    BF -->|concatMap| DM["sendMessageDraft<br/>via Pulsar"]
    DM -->|consume + dispatch| TA["Telegram API"]

    style FC fill:#2d2d2d,stroke:#7b61ff,color:#fff
    style BF fill:#2d2d2d,stroke:#7b61ff,color:#fff
    
  • If the Telegram API slows down, concatMap stops requesting the next batch from the buffer.
  • If the buffer fills, it stops requesting from the WebClient’s bodyToFlux.
  • If the WebClient stops reading, TCP flow control pauses the chat service’s response.
  • If the response pauses, the Flux.create sink’s requestedFromDownstream() drops to zero.

The LLM itself is the only component that does not respect back-pressure — it generates tokens at its own pace. But the Flux.create sink absorbs this mismatch. Tokens accumulate in memory until downstream demand resumes. For LLM outputs — which are typically kilobytes, not gigabytes — this is an acceptable tradeoff.


The Complete Token Lifecycle

To see how all the pieces fit together, trace a single user message through the system:

      sequenceDiagram
    participant U as User
    participant TP as Telegram Poller
    participant PS as Pulsar
    participant TW as Telegram Worker
    participant CS as Chat Service
    participant LLM as LLM

    U->>TP: "Explain reactive streams"
    TP->>PS: publish update
    PS->>TW: deliver
    TW->>CS: POST /chat (NDJSON)

    CS->>LLM: streaming request
    LLM-->>CS: "Reactive"
    CS-->>TW: NDJSON line
    LLM-->>CS: " streams"
    CS-->>TW: NDJSON line
    LLM-->>CS: " are"
    CS-->>TW: NDJSON line

    Note over TW: 2s buffer fires

    TW->>TW: accumulate → "Reactive streams are"
    TW->>PS: draft reply (draftId=42)
    PS->>TP: deliver
    TP->>U: sendMessageDraft(42, "Reactive streams are")

    LLM-->>CS: " a specification"
    CS-->>TW: NDJSON line
    LLM-->>CS: " for..."
    CS-->>TW: NDJSON line

    Note over TW: 2s buffer fires

    TW->>TW: accumulate → "Reactive streams are a specification for..."
    TW->>PS: draft reply (draftId=42)
    PS->>TP: deliver
    TP->>U: sendMessageDraft(42, "Reactive streams are a specification for...")

    LLM-->>CS: [complete]
    CS-->>TW: stream complete

    TW->>PS: final reply (full text + keyboard)
    PS->>TP: deliver
    TP->>U: sendMessage(full text + keyboard)
    

The user sees text appearing progressively in their Telegram chat, exactly like typing indicators in ChatGPT — except the text is real content, not just “…”.


Timeouts and Resilience

Streaming LLM responses can take a long time. The WebClient on the worker side needs extended timeouts to accommodate this:

HttpClient httpClient = HttpClient.create()
        .responseTimeout(Duration.ofSeconds(60))
        .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000)
        .doOnConnected(conn -> conn
                .addHandlerLast(new ReadTimeoutHandler(300, TimeUnit.SECONDS))
                .addHandlerLast(new WriteTimeoutHandler(5, TimeUnit.SECONDS)));

The read timeout of 300 seconds accommodates the total streaming duration. The response timeout of 60 seconds applies per chunk — if no NDJSON line arrives for 60 seconds, the connection drops. This catches cases where the LLM hangs without producing output, while still allowing long gaps between tokens during complex reasoning.


Conclusion

Streaming LLM responses to Telegram combines three reactive patterns:

  1. Token-level streamingFlux.create bridges the callback-based LLM SDK into reactive streams, with NDJSON serialization handling the HTTP transport.
  2. Time-based buffering.buffer(Duration.ofSeconds(2)) with concatMap batches tokens into draft updates, balancing responsiveness against API call volume.
  3. Draft message progressionsendMessageDraft with a stable draft_id updates the message bubble in-place, giving users real-time feedback while the response generates.

The reactive pipeline ensures back-pressure propagates from the Telegram API all the way back to the LLM token stream. Each service — poller, worker, chat — scales independently through Pulsar topic decoupling.

The result is the same streaming experience users expect from ChatGPT or Claude, delivered through Telegram’s native message interface — with no polling, no WebSocket, and no client-side rendering logic.


References

  1. Telegram Bot API — sendMessageDraft
  2. LangChain4j — Streaming
  3. LangChain4j — TokenStream API
  4. Project Reactor — Flux.create
  5. NDJSON Specification
  6. Streaming Data with Spring WebFlux and NDJSON
  7. Reactive Telegram Client: Polling, Pipelines, and Pulsar
  8. Reactive Architecture: Theoretical Dive into Blocking Architecture
  9. Comparing 3 Spring Boot Apps: Classic Servlet, Reactive, RSocket
  10. Building a RAG-Powered News Assistant with LangChain4J and Qdrant