Writing Apache Spark UDFs in Go

Spark Gopher

Apache Spark is a perfect fit for processing large amounts of data. It’s not,  however, a perfect fit for our language stack at Community. We are largely an Elixir shop with a solid amount of Go, while Spark’s native stack is Scala but also has a Python API. We’re not JVM people so we use the Python API—via the Databricks platform. For most of our work, that’s just fine. But, what happens when you want to write a custom user defined function (UDF) to do some heavy lifting? We could write new code in Python, or… we could use our existing Go libraries to do the job! This means we have to wrap up our Go code into a jar file that can be loaded into the classpath for the Spark job. This is how to do that.

(Note that everything below could equally apply to UDAFs—aggregate functions)

Why Did You Do This Crazy Thing?

You might be wondering how well this works. To put that issue to rest: it works well and has been effective for us. That being established, let’s talk about our use case. We have a large amount of data stored in a binary container format that wraps Protobuf records, stored on AWS S3. Spark is great with S3, but cannot natively read these files. Complicating things further, schemas for the Protobuf records need to be kept up to date for all the tools that process this data in anything but the most trivial way.

Over time we have built a set of battle-tested Go libraries that work on this data. Furthermore, we already maintain tooling to keep the Protobuf schemas and Go libraries up to date. It seems natural to leverage all of that goodness in our Spark jobs:

  1. Battle-tested libs that have been in production for awhile, with good tests.
  2. We already manage pipelines to keep the schemas up to date for the Go libs.

Given the tradeoffs of reimplementing those libraries and the CI jobs, or wrapping Go code up into jar files for use as Spark UDFs, we did the sane thing!

Spark UDFs

Spark is flexible and you can customize your jobs to handle just about any scenario. One of the most re-useable ways to work with data in Spark is user defined functions (UDFs). These can be called directly inside Spark jobs or Spark SQL. In our case we use a UDF to transform our custom binary input and Protobuf into something that Spark more easily understands: JSON.

This means we can do something like this:

import pyspark.sql.functions as F
import pyspark.sql.types as T

spark.udf.registerJavaFunction("decodeContent", "com.community.EventsUDF.Decoder")

# Define a schema for the JSON fields we want (you must customize)
event_schema = T.StructType([
  T.StructField("wrapper", T.StructType([
    T.StructField("id", T.StringType()),
    T.StructField("somefield", T.StringType()),
    T.StructField("timestamp", T.StringType()),
    T.StructField("anotherfield", T.StringType())
  ]))
])

events_df = (
  spark.read.format("binaryFile")
  .load(f"s3a://path_to_data/*")
  .withColumn("content", F.expr("decodeContent(content)").cast(T.StringType()))
)

parsed_json_df = (
  (events_df)
  .withColumn("json_data", F.from_json("event", event_schema))
  # Break out part of the JSON into a column
  .withColumn("type", F.col("json_data.yourjsonfield.type"))
)

# Continue working with the dataframe

By transforming the data into JSON, we get to work on something that is much more native to Spark than Protobuf. Because we use the libraries that are already maintained with up to date bindings, we don’t have to manage that inside Spark itself. We only have to update our UDF in the jar loaded by the job.

Building a Spark UDF in Go

Before we look too hard at anything else we need to get Go code into a wrapper that lets us call it from Java. Then we’ll need to write a little Java glue to wrap the Go code into the right types to make Spark happy when calling it. Luckily, for this use case it’s all pretty simple.

Wrapping Go Code Into a Java Library

First, we need some Go code to wrap. The library we talked about was built for one purpose, but we need a little wrapper function to do just the part we need to call from Spark. In our case we’ll take a single value from Spark and return a single value back. Both of those values will be of type []byte which makes things really simple going between Go and Java, where this type interchanges easily. I made a file called spark_udf.go that looks like this:

package eventdecoder

import (
        "bytes"
        "compress/gzip"
        "io/ioutil"

        "github.com/company/yourlib"
)

func Decode(data []byte) []byte {
        zReader, err := gzip.NewReader(bytes.NewReader(data))
        if err != nil {
                // We return an error as the body!
                return []byte(err.Error())
        }

        uncompressedData, err := ioutil.ReadAll(zReader)
        if err != nil {
                // We return an error as the body!
                return []byte(err.Error())
        }

        output, _, _, err := yourlib.DoSomeWork(uncompressedData) 

        // Handle errors, etc

        return bytes.Join(output, []byte("\n"))
}

Now that we have that, we need to get it into a jar file with all the bindings that will let Java call into the Go function. As part of the gomobile efforts to integrate Go with Android, there are some tools that will handle two way wrappers between Java and Go. gobind will generate Java bindings for exported functions in a Go module. You could start there and build up the necessary tools to make this work. Sadly, it’s not at all trivial to get it into a shape that will build nicely.

After messing around with it for awhile, I found a tool called gojava that wraps all the hard parts from gobind into a single, easy to use tool. It’s not perfect: it does not appear to be under active development, and does not support Go modules. But, it makes life so much easier, and because none of this stuff changes that often, the lack of active development isn’t much of a hindrance here. The ease of use makes it worth it for us. Getting a working Java jar file is a single step:

JAVA_HOME=<your_java_home> \
        GO111MODULE=off \
        gojava -v -o `pwd`/eventdecoder.jar build github.com/<your_module_path>

This will generate a file called eventdecoder.jar that contains your Go code and the Java wrappers to call it. Great, right? If you are using Go modules, just use go mod vendor before running gojava to make sure that you have all your dependencies in a form that gojava can handle.

Adding the Right Interface

But we are not done yet. The Go code in the jar we built does not have the right interface for a Spark UDF. So we need a little code to wrap it. You could do this in Scala, but for us Java is more accessible, so I used that. Spark UDFs can take up to 22 arguments and there are different interfaces defined for each set of arguments, named UDF1 through UDF22. In our case we only want one input: the raw binary. So that means we’ll use the UDF1 interface. Here’s what the Java wrapper looks like:

package com.community.EventsUDF;
import org.apache.spark.sql.api.java.UDF1;
import go.eventdecoder.Eventdecoder;

public class Decoder implements UDF1<byte[], byte[]> {
        private static final long serialVersionUID = 1L;

        @Override
        public byte[] call(byte[] input) throws Exception {
				// Call our Go code
                return Eventdecoder.Decode(input);
        }
}

We stick that in our path following the expected Java layout. So if spark_udf.go is in the current directory, below it we put the above files in java/com/community/EventsUDF/Decoder.java. Note that this needs to match your package name inside the Java source file.

Assembling It

We’re almost there! But, we need the Spark jar files that we’ll compile this against. Our project has a Makefile (which I’ll share at the end of this post) that downloads the correct jars and sticks them in ./spark_jars. With those being present, we can compile the Java code:

javac -cp \
	spark_jars/spark-core_$(VERSION).jar:eventdecoder.jar:spark_jars/spark-sql_$(VERSION).jar \
	java/com/community/EventsUDF/Decoder.java

With that compiled we can just add it to our Jar file and we’re ready to go!

cd java && jar uf ../eventdecoder.jar com/community/EventsUDF/*.class

That will update the jar file qnd insert our class. You can load that up into Spark and call it as shown at the start of this post.

Important Notes

You need to build this for the architecture where you will run your Spark job! Spark may be in Scala, and we may have wrapped this stuff up into a Java jar, but the code inside is still native binary. You may have some success with GOOS and GOARCH settings, but it has not been repeatable for us with gojava. We’ve built our UDFs that we use on Databricks’ platform under Ubuntu 18.04 with Go 1.14 and they work great.

Another gotcha that Aki Colović found when working with this stuff, is that your Go package cannot contain an underscore or it makes the Java class loader unhappy. So don’t do that.

A Makefile to Make it Simpler

There were a few finicky steps above and this needs to be repeatable. So, I built a Makefile to facilitate these UDF builds. It does all the important steps in one go. As you will see below, we have cached the right versions of the Spark jars in S3, but you can pull them from wherever you like. If you are into Java build tools, feel free to use those.

The Makefile I wrote looks like this. You will likely have to make little changes to make it work for you.

# Makefile to build and test the eventdecoder.jar containing the
# Apache Spark UDF for decoding the events files in the Community
# event store.
SCALA_VERSION := 2.11
SPARK_VERSION := 2.4.5
VERSION := $(SCALA_VERSION)-$(SPARK_VERSION)
BUCKET_PATH ?= somewhere-in-s3/spark
JAVA_HOME ?= /usr/lib/jvm/java-8-openjdk-amd64
TEMPFILE := $(shell mktemp)

all: ../vendor udf

../vendor:
	go mod vendor

.PHONY: gojava
gojava:
	go get -u github.com/sridharv/gojava
	go install github.com/sridharv/gojava

eventdecoder.jar: gojava
	JAVA_HOME=$(JAVA_HOME) \
		GO111MODULE=off \
		gojava -v -o `pwd`/eventdecoder.jar build github.com/my-package/eventdecoder

spark_jars/spark-sql_$(VERSION).jar:
	mkdir -p spark_jars
	aws s3 cp s3://$(BUCKET_PATH)/spark-sql_$(VERSION).jar spark_jars

spark_jars/spark-core_$(VERSION).jar:
	mkdir -p spark_jars
	aws s3 cp s3://$(BUCKET_PATH)/spark-core_$(VERSION).jar spark_jars

spark-binaries: spark_jars/spark-core_$(VERSION).jar spark_jars/spark-sql_$(VERSION).jar

# Build the UDF code and insert into the jar
udf: spark-binaries eventdecoder.jar
	javac -cp spark_jars/spark-core_$(VERSION).jar:eventdecoder.jar:spark_jars/spark-sql_$(VERSION).jar java/com/community/EventsUDF/Decoder.java
	cd java && jar uf ../eventdecoder.jar com/community/EventsUDF/*.class

.PHONY: clean
clean:
	rm -f eventdecoder.jar

Conclusion

What may seem like a slightly crazy idea turned out to be super useful and we’ve now built a few UDFs based on this original. Re-using Go libraries in your Spark UDF is not only possible, it can be pretty productive. Give it a shot if it makes sense for you.

Credits

The original UDF project at Community was done by Alec Rubin and me, while improvements were made by Aki Colović, and Tom Patterer.