A short introduction of Apache Beam

5 minute read

Introduction

Once upon a time, in the wide Big Data’s world, several frameworks and runtime environments existed to design, code and execute ETL, streaming and batch tasks. It was very complicated for developers and architect to wisely choose the good one corresponding to the use case. It should be evolutive and could scale effectively in production. There were Apache SPARK SQL for queries, Apache Flink for real-time streaming, Hadoop Map reduce for batch usages and so on. In this way, it was very complicated and a bit overwhelming to pick and deploy in production the right tool for the right job.

To help the IT crowd (designers, developers, architects), Google created Dataflow and gave his SDK to the Apache Software Foundation in 2016.

Apache Beam was born.

apache_beam

After reading this “fairy tale”, you probably understood in that this FRAMEWORK aims to provide an unified programming model. With Apache Beam, you can design and implement with only one API a use case which can be run on several environments.

Among these, we can find:

For development and testing purpose, Apache Beam also provides a local runner which can be executed on the desktop.

Yes, the slogan of this FRAMEWORK could be “Write Once, Run Everywhere”.

Few concepts

From the first tutorials, to more complex programs, you can address three main notions:

Pipeline

pipeline

The pipeline’s notion is like on another FRAMEWORKS (eg. Flink). It’s just a representation of a set of transformations applied from an input data to an output. As input or output, we can find message brokers (eg. RabbitMQ, Kafka), databases (eg. MongoDb) or a directory.

The execution of the pipeline could be run once, periodically or once you get new elements (eg. when you have a new file in a directory).

Collection

The collection (in Apache Beam a PCollection) is a representation of distributed and immutable data sets. It represents a image of an input / output. It could be merged, splitted, filtered and you can apply some aggregations such as a group by key.

A simple copy:

    public void extractFilesFromFolderToFile() {
        Pipeline pipeline = PipelineOptionsFactory.create();
        pipeline.apply(TextIO.read().from("/path/to/folder")) // creates a first PCollection
                .apply(TextIO.write().to(System.getProperty("java.io.tmpdir") + File.separator + "wordcounts"));
        pipeline.run().waitUntilFinish();
    }

Apache Beam provides by default several connectors which enable interactions with input or output data sources.

For instance, you could find these connectors:

  • Kafka
  • File directory
  • RabbitMQ
  • MongoDB
  • SQL databases
  • HDFS folders

You can check the list of all supported IO transformations in the official documentation.

Transformations

From an input data to an output ( actually, you can have some input data sources and output destinations) , you can apply some transformations. Apache Beam provides several standards transformations. The official documentation provides a complete guide about these ones.

You can create your own transformations. For instance, you can find below a transformation which creates a key/value document:

public class FormatAsKeyValueDocument extends SimpleFunction<Document, KV<String, Document>> {
    @Override
    public KV<String, Document> apply(Document input) {
        return KV.of(String.valueOf(input.get("id")), input);
    }
}

An another example which converts a JSON document stored in a String value to a MongoDB document:

public class FormatStringAsDocument extends SimpleFunction<String, Document> {
    @Override
    public Document apply(String input) {
        try {
            ObjectMapper objectMapper = new ObjectMapper();
            // We register the java time module to handle LocalDateTime objects
            objectMapper.registerModule(new JavaTimeModule());
            // Here the timestamp is a DTO
            Timestamp timestamp = objectMapper.readValue(input, Timestamp.class);
            Map<String, Object> value = objectMapper.convertValue(timestamp, new TypeReference<Map<String, Object>>() {
            });
            value.put("date", LocalDate.parse(String.valueOf(value.get("date"))));
            value.put("beginning", LocalDate.parse(String.valueOf(value.get("beginning"))));
            value.put("end", LocalDate.parse(String.valueOf(value.get("end"))));
            Document document = new Document(value);
            return document;
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }
}

These transformations could be applied on your pipeline as

   PCollection<KV<String, Document>> linesReaded = pipeline.apply(TextIO.read()
                .from("path/to/directory")
                .watchForNewFiles(
                        standardSeconds(15),
                        afterTimeSinceNewOutput(Duration.standardHours(1))))
                .apply(Window.into(FixedWindows.of(Duration.standardSeconds(10))))
                .apply(MapElements.via(new FormatStringAsDocument())) 
                .apply(MapElements.via(new FormatAsKeyValueDocument()));

There are some contraints when applying transformations. Data manipulated must be immutable. You have to prevent storing them. If you do that, they have to be either transient or serializables.

Runtime environment

You can select it by adding the dependency in the classpath. For instance, if we want to use the local direct runner on your desktop, we can add this dependency:

 compile group: 'org.apache.beam', name: 'beam-runners-direct-java', version: "2.11.0"

Benefits

As said previously, Apache Beam provides a kind of “write once, run everywhere” for the Big Data ecosystem. It enables decoupling API from the runtime environment target. In my opinion, it could help people to gather knowledge on only one API and leverage depending the use case and the production requirements on some target environments. It could also help to overflow workloads from on premise to cloud environments.

Limitations

During my tests, I noticed some limitations. Among these, we have:

Support for new FRAMEWORKS

I first tried Apache Beam using Java 11… It’s still not supported. I had to fallback to Java 8. Furthermore, I had some difficulties with the integration API (eg. RabbitMQ or Kafka). I first tried to implement a simple use case with RabbitMQ. I chose to migrate to Kafka. It’s better documented and you have to be aware of the Kafka version. Beam doesn’t work well with the former releases of Kafka.

Error handling

In my opinion, it’s the main limitation for using this FRAMEWORK on production (especially for on premise environments).

One side, during transformations, you can detect and handle errors by implementing a « dead letter queue » integration pattern. You can find a more detailed explanation here.

On the other hand, during input reading or output writing, when running in streaming mode, a bundle including a failing item will be retried indefinitely, which may cause your pipeline to permanently stall. This behaviour is very similar of “Poison Message” errors in JMS.

To Sum up

This framework provides an innovative new way to design and implement data workloads. Even if there still are some limitations such as the error handling, Apache Beam is an interesting and promising technology.

It enables to decouple the API to the runtime environment. It could represent an added value for your projects if you have to deal with “on premise” and cloud environments by preventing duplicating you business logic on several technologies.


Written by

Alexandre Touret

Architect #java #api #ci #software_craftmanship