sebastiandaschner news


monday, march 20, 2017

Welcome to my newsletter #2!

I’m sitting in a coffee shop in New York, preparing for the Oracle Code conference tomorrow. After interesting days in Canada I had a fews days of client work in Germany until I crossed the Atlantic again — and now New York for the first time.

 

What’s new

 

Integrating Apache Kafka with Java EE

Apache Kafka can be used as a reliable, distributed Event Store and integrated into Java applications using the Java Client API. The API offers an easy integration into (enterprise) applications.

First, let’s have a look how to produce messages.

@ApplicationScoped
public class EventProducer {

    private Producer<String, CoffeeEvent> producer;

    @Inject
    Properties kafkaProperties;

    @Inject
    Logger logger;

    @PostConstruct
    private void init() {
        producer = new KafkaProducer<>(kafkaProperties);
    }

    public void publish(CoffeeEvent event) {
        ProducerRecord<String, CoffeeEvent> record = new ProducerRecord<>("beans", event);
        logger.info("publishing = " + record);
        producer.send(record);
        producer.flush();
    }

    @PreDestroy
    public void close() {
        producer.close();
    }

}

The event producer CDI managed bean contains a KafkaProducer<String, CoffeeEvent> to publish messages serialized from CoffeeEvent to the topic beans. Here the events are sent and flushed in every call, so that the caller of publish(CoffeeEvent) can rely upon the message being sent. On application shutdown the producer should be closed not to leak resources — we ensure this by the @PreDestroy annotation.

Our consumer is wrapped in an EventConsumer, as we want it to consume messages as long as the application is up.

public class EventConsumer implements Runnable {

    private KafkaConsumer<String, CoffeeEvent> consumer;
    private final Consumer<CoffeeEvent> eventConsumer;
    private final AtomicBoolean closed = new AtomicBoolean();

    public EventConsumer(Properties kafkaProperties,
            Consumer<CoffeeEvent> eventConsumer, String... topics) {
        this.eventConsumer = eventConsumer;
        consumer = new KafkaConsumer<>(kafkaProperties);
        consumer.subscribe(asList(topics));
    }

    @Override
    public void run() {
        try {
            while (!closed.get()) {
                consume();
            }
        } catch (WakeupException e) {
            // will wakeup for closing
        } finally {
            consumer.close();
        }
    }

    private void consume() {
        ConsumerRecords<String, CoffeeEvent> records = consumer.poll(Long.MAX_VALUE);
        for (ConsumerRecord<String, CoffeeEvent> record : records) {
            eventConsumer.accept(record.value());
        }
        consumer.commitSync();
    }

    public void stop() {
        closed.set(true);
        consumer.wakeup();
    }

}

The consumer subscribes to one or more topics and by calling poll(long) we wait until new messages (or the timeout) happen. The message will then be handled in the Consumer<CoffeeEvent> provided by the caller. The commit functionality of Kafka enables reliable “only once” messages for our consumer group. Once the consumer confirms the message consumption by calling commitSync the same message won’t be sent a second time for the configured consumer group.

As we want this consumption to happen indefinitely, we wrap consume into a while-loop that will be ended when the caller invokes stop. To avoid the consumer being stuck in polling for messages, Kafka uses WakeupExceptions that may eventually occur.

Now another managed bean integrates this EventConsumer into our Java EE environment.

@Startup
@Singleton
public class BeanUpdateConsumer {

    private EventConsumer eventConsumer;

    @Inject
    Event<CoffeeEvent> events;

    @Resource
    ManagedExecutorService mes;

    @Inject
    Properties kafkaProperties;

    @Inject
    Logger logger;

    @PostConstruct
    private void init() {
        kafkaProperties.put("group.id", "beans-consumer-" + UUID.randomUUID());

        eventConsumer = new EventConsumer(kafkaProperties, ev -> {
            logger.info("firing = " + ev);
            events.fire(ev);
        }, "beans");

        mes.execute(eventConsumer);
    }

    @PreDestroy
    public void close() {
        eventConsumer.stop();
    }

}

This managed bean will fire CDI events that contain the actual message. Therefore it creates a new EventConsumer with the corresponding setup and the (JDK 8) Consumer, that fires the actual event. As enterprise developers are discouraged to start non-managed threads themselves we use the injected ManagedExecutorService to do so.

Again, the container will stop the event consumer in the @PreDestroy-annotated method.

This example is taken from the scalable coffee shop, discussed in my CQRS video course.

 

SummaryStatistics JDK classes

Recently, I (per accident) learned about the *SummaryStatistics classes, shipped in the JDK since 1.8 — DoubleSummaryStatistics, IntSummaryStatistics and LongSummaryStatistics, respectively. These classes extend number Consumers and calculate the minimum, maximum and average of provided values. They are meant to be used within streams like follows:

double averageAge = people.stream()
    .collect(Collectors.summarizingDouble(Person::getAge))
    .getAverage();

However, these classes can also be used standalone.

DoubleSummaryStatistics statistics = new DoubleSummaryStatistics();

statistics.accept(3);
statistics.accept(4);
statistics.accept(5);

assertThat(statistics.getAverage(), is(4d));

Just keep in mind that they’re not thread-safe — on purpose, as they were primarily designed for streams and parallel streams provide necessary isolation already.

 

IntelliJ navigation

This newsletter’s “making-developers-faster” tip is about IntelliJ’s navigation again:

By Ctrl + U you navigate to the super declaration of a method — it is the counterpart to Ctrl + Alt + B. Placing the cursor on toString of the String class will, for instance, jump to Object#toString.

 

Thanks a lot for reading and greetings from Manhattan, New York!

 

Did you like the content? You can subscribe to the newsletter for free:

All opinions are my own and do not reflect those of my employer or colleagues.