From Design to Implementation

In the previous article, I outlined the design goals and high-level architecture of a reactive Telegram bot framework — why non-blocking I/O matters, how separating polling from processing enables horizontal scaling, and why sealed interfaces make response dispatch type-safe at compile time.

That article showed simplified code to explain the concepts. This one goes deeper. We will walk through the actual implementation: how Flux.create and expand drive the polling loop with back-pressure, how atomic variables coordinate concurrent state without locks, how the channel abstraction separates read and write concerns, and how Apache Pulsar’s reactive client integrates natively with Project Reactor.

If you are building the delivery layer for something like the RAG-powered assistant from the previous article — where users interact with an LLM through Telegram — this is the infrastructure that makes it reactive end-to-end.


Architecture — What Changed

The previous article described a single ReactiveQueue<T> interface with publish, subscribe, and close. In practice, this conflation caused problems: a component that only reads from a queue should not have access to publish, and a publisher should not accidentally call subscribe. The interface was split into two:

      graph TD
    RC["ReactiveChannel<br/><i>extends AutoCloseable</i>"]
    RRC["ReadableReactiveChannel&lt;V&gt;<br/><i>Flux&lt;V&gt; subscribe()</i>"]
    WRC["WritableReactiveChannel&lt;I, O&gt;<br/><i>Mono&lt;O&gt; publish(I)</i>"]

    RC --> RRC
    RC --> WRC

    RRC --> PRRC["PulsarReadableReactiveChannel&lt;T&gt;"]
    WRC --> PWRC["PulsarWritableReactiveChannel&lt;T&gt;"]
    

This is Interface Segregation applied to reactive channels. The polling pipeline also changed fundamentally — from Flux.interval with flat retry to Flux.create with demand-driven expansion. Let me walk through each component.


The Polling Pipeline — Flux.create Meets expand

The heart of the framework is TelegramBotUpdatePoller. Its job: continuously fetch updates from the Telegram Bot API and emit them as a Flux<Update> that respects downstream back-pressure.

The previous article showed a simplified version using Flux.interval:

// Simplified version from the previous article
Flux.interval(pollingInterval)
    .flatMap(tick -> client.getUpdates(offset, limit, timeout, allowedUpdates))
    .flatMap(response -> Flux.fromIterable(response.getResult()))
    .doOnNext(update -> offset = update.updateId() + 1)
    .retry()
    .subscribe();

This works, but has a fundamental issue: it ignores back-pressure. The interval ticks regardless of whether downstream consumers have processed previous updates. If the handler is slow, updates pile up in memory.

The actual implementation uses Flux.create with a demand-driven loop:

public Flux<Update> subscribe() {
    return Flux.<Update>create(sink -> {
        sink.onRequest(requested -> {
            Disposable disposable = pollUpdates(sink, lastOffset.get())
                .doOnNext(sink::next)
                .collectList()
                .expand(list -> {
                    if (list.size() >= limit
                            && sink.requestedFromDownstream() > 0
                            && !sink.isCancelled()) {
                        return pollUpdates(sink, lastOffset.get()).collectList();
                    }
                    sink.complete();
                    return Flux.empty();
                })
                .subscribeOn(Schedulers.boundedElastic())
                .subscribe();

            sink.onDispose(disposable);
            sink.onCancel(disposable);
        });
    });
}

There is a lot happening in these 20 lines. Let me break it down.

Flux.create — Bridging Push and Pull

Flux.create gives us a FluxSink — a programmatic handle for pushing elements into a reactive stream. Unlike Flux.generate (which is pull-based and synchronous), Flux.create supports multi-threaded emission and lets us control the lifecycle explicitly.

The sink.onRequest(requested -> ...) callback fires when the downstream subscriber signals demand. This is the back-pressure entry point: we only start polling when someone is ready to consume.

expand — Recursive Continuation Without Stack Overflow

After the first poll returns, we need to decide: should we poll again immediately, or stop? This is where expand comes in. It works like a breadth-first recursive flatMap:

  1. Collect the poll results into a List
  2. If the list is full (size >= limit), downstream still wants data (requestedFromDownstream() > 0), and the sink is not cancelled — poll again
  3. Otherwise, complete the stream

This is critical for Telegram’s long-polling model. When there are many pending updates, the API returns a full batch immediately. expand chains these batch fetches without returning to the event loop between each one, reducing latency during catch-up scenarios.

Why Not Flux.interval?

Aspect Flux.interval Flux.create + expand
Back-pressure Ignores downstream demand Checks requestedFromDownstream() before each poll
Batch chaining Fixed interval between polls Immediate continuation when batch is full
Cancellation Requires external Disposable management Built-in via sink.onCancel
Thread model Timer thread + flatMap scheduler Single boundedElastic subscription
Error propagation Requires explicit onErrorResume in the chain Errors flow directly to sink.error

Atomic State — Thread Safety Without Locks

The poller maintains three pieces of mutable state, all using java.util.concurrent.atomic types:

private final AtomicLong lastOffset = new AtomicLong(0);
private final AtomicBoolean inFlight = new AtomicBoolean(false);
private final AtomicReference<Disposable> subscription = new AtomicReference<>();

Offset Tracking

Each Telegram update has a monotonically increasing updateId. The next poll must send offset = lastUpdateId + 1 to avoid receiving duplicates. The update happens atomically:

.doOnNext(update -> {
    lastOffset.updateAndGet(currentVal -> {
        if (update.updateId() >= currentVal) {
            return update.updateId() + 1;
        }
        return currentVal;
    });
})

The updateAndGet with a conditional ensures we never move the offset backward — even if updates arrive out of order due to retry or concurrent emission.

Flight Control

The inFlight boolean prevents overlapping HTTP requests:

Flux<Update> pollUpdates(FluxSink<Update> sink, long offset) {
    if (sink.isCancelled() || !inFlight.compareAndSet(false, true)) {
        return Flux.empty();
    }

    if (sink.requestedFromDownstream() <= 0) {
        inFlight.set(false);
        return Flux.empty();
    }

    return botClient.getUpdates(offset, limit, timeout, allowedUpdates)
        .onErrorResume(WebClientRequestException.class, e -> {
            if (e.getCause() instanceof ReadTimeoutException) {
                return Flux.empty();
            }
            return Flux.error(e);
        })
        .doOnNext(update -> { /* offset tracking */ });
}

Three guard clauses protect against invalid states:

  1. sink.isCancelled() — subscriber disconnected, stop immediately
  2. inFlight.compareAndSet(false, true) — another poll is already in progress, skip
  3. requestedFromDownstream() <= 0 — no demand, do not fetch

The ReadTimeoutException handling is important: Telegram’s long-polling returns an empty response when the timeout expires without new updates. This is normal operation, not an error — we return Flux.empty() and let expand decide whether to poll again.


The HTTP Client — WebClient and Non-Blocking I/O

TelegramBotClient wraps Spring’s WebClient to call every Telegram Bot API method reactively:

public Flux<Update> getUpdates(long offset, int limit, int timeout,
                                List<String> allowedUpdates) {
    return webClient.get()
        .uri(endpoint + "/bot" + token, uriBuilder ->
            uriBuilder
                .pathSegment("getUpdates")
                .queryParam("offset", offset)
                .queryParam("limit", limit)
                .queryParam("timeout", timeout)
                .queryParam("allowed_updates", allowedUpdates)
                .build()
        )
        .exchangeToMono(clientResponse ->
            extractResponse(clientResponse,
                new ParameterizedTypeReference<Response<List<Update>>>() {})
        )
        .flatMapMany(response -> Flux.fromIterable(response.getResult()));
}

Key details:

  • exchangeToMono gives access to the full ClientResponse, including status code and headers, before committing to a body type. This is where error responses are transformed into TelegramApiException.
  • flatMapMany unpacks the List<Update> from the response into individual elements in the stream. This is what allows the poller to process updates one by one rather than in batches.
  • The timeout parameter is passed directly to Telegram’s API — this is the long-polling timeout in seconds, not an HTTP timeout. The HTTP client should have its own read timeout configured slightly higher than this value.

Every send method (sendMessage, sendPhoto, editMessageText, etc.) follows the same pattern — webClient.post() with form data, returning Mono<Response<T>>:

public Mono<Response<Message>> sendMessage(Long chatId, String text,
        @Nullable ParseMode parseMode, @Nullable Object replyMarkup, ...) {
    BodyInserters.FormInserter<String> inserter =
        BodyInserters.fromFormData("chat_id", String.valueOf(chatId))
                     .with("text", text);

    if (parseMode != null) {
        inserter = inserter.with("parse_mode", parseMode.name());
    }

    return webClient.post()
        .uri(endpoint + "/bot" + token, uriBuilder ->
            uriBuilder.pathSegment("sendMessage").build()
        )
        .contentType(MediaType.MULTIPART_FORM_DATA)
        .body(inserter)
        .exchangeToMono(clientResponse ->
            extractResponse(clientResponse,
                new ParameterizedTypeReference<Response<Message>>() {})
        );
}

No thread is ever blocked waiting for a Telegram API response.


Building the Processing Pipeline — Pure Functional Composition

There is no dedicated handler interface or abstraction layer between the poller and your business logic. The Flux<Update> returned by TelegramBotUpdatePoller.subscribe() is a standard Reactor stream — you compose your processing pipeline using regular operators like flatMap, map, filter, and doOnNext.

The simplest pipeline — poll, process, reply:

poller.subscribe()
    .filter(update -> update.message() != null && update.message().text() != null)
    .flatMap(update -> {
        long chatId = update.message().chat().id();
        String text = update.message().text();

        return replier.dispatch(
            new BotResponse.SendMessage(chatId, "Echo: " + text, ParseMode.HTML)
        );
    })
    .subscribe();

No framework abstractions, no handler interfaces — just Reactor operators. This is deliberate. A dedicated UpdateHandler interface would add indirection without adding value: the Function<Update, Publisher<?>> that flatMap already accepts is the handler.

Multi-Step Responses

A single update often requires multiple outbound actions. Since each step returns a Mono, you chain them with then or thenReturn:

poller.subscribe()
    .filter(update -> update.message() != null)
    .flatMap(update -> {
        long chatId = update.message().chat().id();

        // 1. Show typing indicator, then send reply
        return replier.dispatch(new BotResponse.SendChatAction(chatId, ChatAction.TYPING))
            .then(replier.dispatch(
                new BotResponse.SendMessage(chatId, "Processed: " + update.message().text())
            ));
    })
    .subscribe();

Custom Error Handling and Retry

Because the pipeline is a plain Flux, you attach retry and error handling where it makes sense — per-update, per-section, or globally:

poller.subscribe()
    .flatMap(update -> processUpdate(update)
        .onErrorResume(TelegramApiException.class, e -> {
            log.warn("Telegram API error for update {}: {}", update.updateId(), e.getMessage());
            return Mono.empty(); // skip this update, continue processing
        })
    )
    .retry()  // restart the entire pipeline on unrecoverable errors
    .subscribe();

The inner onErrorResume isolates failures per update — one bad message does not kill the stream. The outer retry restarts the pipeline if the poller itself fails (network outage, Telegram API down).


The Channel Abstraction

The previous article used a single ReactiveQueue<T> interface. The current design splits this into two:

public interface ReactiveChannel extends AutoCloseable {
    void close();
}

public interface ReadableReactiveChannel<V> extends ReactiveChannel {
    Flux<V> subscribe();
}

public interface WritableReactiveChannel<I, O> extends ReactiveChannel {
    Mono<O> publish(I message);
}

The split has a practical consequence: WritableReactiveChannel<I, O> uses two type parameters. The input type I is what you publish; the output type O is what you get back (typically a message ID or acknowledgment). This matters for Pulsar, where publishing returns a MessageId that you might want to log or track.


Reactive Pulsar — Native Integration

The Pulsar queue module uses pulsar-client-reactive-adapter (0.7.0), which wraps the standard Pulsar client in Project Reactor types natively — no manual Mono.fromFuture bridging needed.

WritableReactiveChannel

public class PulsarWritableReactiveChannel<T>
        implements WritableReactiveChannel<MessageSpec<T>, MessageId> {

    private final ReactiveMessageSender<T> producer;

    public PulsarWritableReactiveChannel(ReactiveMessageSender<T> producer) {
        this.producer = producer;
    }

    @Override
    public Mono<MessageId> publish(MessageSpec<T> message) {
        return producer.sendOne(message);
    }
}

ReactiveMessageSender.sendOne() returns Mono<MessageId> — fully non-blocking, back-pressure aware, and integrated with Reactor’s context propagation for tracing.

ReadableReactiveChannel

public class PulsarReadableReactiveChannel<T>
        implements ReadableReactiveChannel<Message<T>> {

    private final ReactiveMessageReader<T> consumer;

    public PulsarReadableReactiveChannel(ReactiveMessageReader<T> consumer) {
        this.consumer = consumer;
    }

    @Override
    public Flux<Message<T>> subscribe() {
        return consumer.readMany();
    }
}

ReactiveMessageReader.readMany() returns a Flux<Message<T>> that emits messages as they arrive on the topic. Back-pressure is handled at the Pulsar client level — if the subscriber slows down, the reader pauses fetching.

Comparison with the Previous Implementation

The previous article showed Pulsar integration using manual CompletableFuture bridging:

// Previous approach — manual bridging
public Mono<Void> publish(T message) {
    return Mono.fromFuture(() -> producer.sendAsync(serialize(message)))
               .then();
}

public Flux<T> subscribe() {
    return Flux.defer(() -> Mono.fromFuture(consumer::receiveAsync))
               .repeat()
               .flatMap(msg -> { /* manual ack/nack */ });
}

The current approach delegates everything to the reactive adapter:

Aspect Manual bridging Reactive adapter
Publish Mono.fromFuture(producer::sendAsync) producer.sendOne(messageSpec)
Subscribe Flux.defer + repeat + manual ack consumer.readMany()
Schema Manual JSON serialization Native Pulsar Schema<T>
Back-pressure Implicit via flatMap concurrency Native Pulsar reader flow control
Message key Manual byte[] manipulation MessageSpec.builder(...).key(...)

The reactive adapter removes an entire category of bugs related to manual future-to-reactor bridging, acknowledgment ordering, and schema version handling.

Wiring It Up

Setting up the Pulsar channels requires the reactive client factory:

PulsarClient pulsarClient = PulsarClient.builder()
    .serviceUrl("pulsar://localhost:6650")
    .build();

ReactivePulsarClient reactivePulsarClient =
    AdaptedReactivePulsarClientFactory.create(pulsarClient);

// Reader — subscribes to updates
var reader = new PulsarReadableReactiveChannel<>(
    reactivePulsarClient.messageReader(Schema.STRING)
        .topic("persistent://public/default/bot-updates")
        .endOfStreamAction(EndOfStreamAction.POLL)
        .startAtSpec(StartAtSpec.ofEarliest())
        .build()
);

// Sender — publishes responses
var sender = new PulsarWritableReactiveChannel<>(
    reactivePulsarClient.messageSender(Schema.STRING)
        .topic("persistent://public/default/bot-responses")
        .build()
);

EndOfStreamAction.POLL is essential — it tells the reader to keep polling for new messages after reaching the end of the topic, rather than completing the Flux. Without this, the stream would terminate after consuming all existing messages.


Putting It All Together

Here is how the full reactive pipeline looks when all components are assembled:

      sequenceDiagram
    participant TG as Telegram API
    participant Poller as TelegramBotUpdatePoller
    participant Pipeline as Reactor Pipeline<br/>(flatMap, map, filter)
    participant Replier as TelegramBotReplier
    participant Pulsar as Pulsar Topic

    loop Long-Polling Loop
        Poller->>TG: getUpdates(offset, limit, timeout)
        TG-->>Poller: List of Update
        Poller->>Poller: Update lastOffset atomically
    end

    Poller-->>Pipeline: Flux of Update
    Pipeline-->>Pipeline: Process, transform, filter
    Pipeline-->>Replier: BotResponse

    Replier->>Replier: Pattern match on sealed type
    Replier->>TG: sendMessage, sendPhoto, editMessageText
    TG-->>Replier: Response

    Note over Poller,Pulsar: With distributed processing via Pulsar
    Poller-->>Pipeline: flatMap to MessageSpec
    Pipeline-->>Pulsar: Publish Update
    Pulsar-->>Pipeline: Subscribe Update (shared subscription)
    Pipeline-->>Pulsar: Publish processed result
    Pulsar-->>Replier: Subscribe BotResponse
    

Direct pipeline — everything in one process:

TelegramBotClient client = new TelegramBotClient(botToken);
TelegramBotUpdatePoller poller = new TelegramBotUpdatePoller(client, 100, 30,
    List.of("message", "callback_query"));
TelegramBotReplier replier = new TelegramBotReplier(client);

poller.subscribe()
    .filter(update -> update.message() != null)
    .flatMap(update -> {
        long chatId = update.message().chat().id();
        return replier.dispatch(
            new BotResponse.SendMessage(chatId, "Processed: " + update.message().text())
        );
    })
    .onErrorResume(e -> {
        log.error("Pipeline error", e);
        return Mono.empty();
    })
    .retry()
    .subscribe();

The entire pipeline is a single Flux chain — poll, filter, process, dispatch. No handler interfaces, no framework callbacks. Standard Reactor composition gives you full control over error handling, retry strategy, and concurrency.

Distributed pipeline via Pulsar — split across services:

// Service 1: Poller — fetches updates and publishes to Pulsar
poller.subscribe()
    .flatMap(update -> updateSender.publish(
        MessageSpec.builder(update)
            .key(String.valueOf(update.message().chat().id()))
            .build()
    ))
    .retry()
    .subscribe();

// Service 2: Worker — reads from Pulsar, processes, writes results back
updateReader.subscribe()
    .flatMap(msg -> {
        Update update = msg.getValue();
        long chatId = update.message().chat().id();
        BotResponse response = new BotResponse.SendMessage(chatId, "Processed");
        return responseSender.publish(
            MessageSpec.builder(response).build()
        );
    })
    .retry()
    .subscribe();

// Service 3: Replier — reads responses from Pulsar, dispatches to Telegram
responseReader.subscribe()
    .flatMap(msg -> replier.dispatch(msg.getValue()))
    .retry()
    .subscribe();

Each service is a standalone Flux pipeline. The message key uses chat().id() to ensure ordering per conversation when using Pulsar’s key-shared subscription. Scaling workers is a matter of starting more instances — Pulsar distributes messages across them automatically.


Testing Reactive Pipelines

Reactor Test’s StepVerifier is indispensable for testing reactive code. The framework uses three testing layers:

Unit tests — verify pipeline logic by testing each flatMap stage in isolation:

@Test
void shouldProcessTextMessage() {
    Update update = createTestUpdate("hello");

    Mono<BotResponse> result = Mono.just(update)
        .filter(u -> u.message() != null && u.message().text() != null)
        .map(u -> new BotResponse.SendMessage(
            u.message().chat().id(), "Echo: " + u.message().text()));

    StepVerifier.create(result)
        .assertNext(r -> assertEquals("Echo: hello",
            ((BotResponse.SendMessage) r).text()))
        .verifyComplete();
}

Integration tests — HTTP client against MockServer:

@Test
void shouldHandleLongPollTimeout() {
    mockServer.when(request().withPath("/bot.*/getUpdates"))
        .respond(response().withDelay(TimeUnit.SECONDS, 35));

    StepVerifier.create(poller.subscribe().take(Duration.ofSeconds(5)))
        .expectComplete()
        .verify();
}

End-to-end tests — Pulsar channels with Docker Compose:

@Test
void publishAndSubscribe_singleMessage() {
    StepVerifier.create(
        sender.publish(MessageSpec.builder("hello").key("key-1").build()).then()
    ).verifyComplete();

    StepVerifier.create(reader.subscribe().take(1))
        .assertNext(received -> {
            assertEquals("hello", received.getValue());
            assertEquals("key-1", received.getKey());
        })
        .expectComplete()
        .verify(Duration.ofSeconds(30));
}

Conclusion

The gap between describing a reactive architecture and implementing one is where most of the interesting decisions live. Flux.create with expand gives us demand-driven polling that respects back-pressure. Atomic variables provide thread-safe state management without locks. Sealed interfaces with pattern matching make response dispatch exhaustive at compile time. And the reactive Pulsar adapter eliminates the manual CompletableFuture bridging that was the previous implementation’s weakest point.

The framework is open-source and available on GitHub.


References

  1. Project Reactor — Flux.create
  2. Reactor expand operator
  3. Apache Pulsar Reactive Client
  4. Telegram Bot API — getUpdates
  5. Spring WebFlux — WebClient
  6. Building a Reactive Telegram Bot Framework with Java
  7. Building a RAG-Powered News Assistant with LangChain4J and Qdrant