Writing Apache Spark UDFs in Go
Apache Spark is a perfect fit for processing large amounts of data. It’s not, however, a perfect fit for our language stack at Community. We are largely an Elixir shop with a solid amount of Go, while Spark’s native stack is Scala but also has a Python API. We’re not JVM people so we use the Python API—via the Databricks platform. For most of our work, that’s just fine. But, what happens when you want to write a custom user defined function (UDF) to do some heavy lifting? We could write new code in Python, or… we could use our existing Go libraries to do the job! This means we have to wrap up our Go code into a jar file that can be loaded into the classpath for the Spark job. This is how to do that.
(Note that everything below could equally apply to UDAFs—aggregate functions)
Why Did You Do This Crazy Thing?
You might be wondering how well this works. To put that issue to rest: it works well and has been effective for us. That being established, let’s talk about our use case. We have a large amount of data stored in a binary container format that wraps Protobuf records, stored on AWS S3. Spark is great with S3, but cannot natively read these files. Complicating things further, schemas for the Protobuf records need to be kept up to date for all the tools that process this data in anything but the most trivial way.
Over time we have built a set of battle-tested Go libraries that work on this data. Furthermore, we already maintain tooling to keep the Protobuf schemas and Go libraries up to date. It seems natural to leverage all of that goodness in our Spark jobs:
- Battle-tested libs that have been in production for awhile, with good tests.
- We already manage pipelines to keep the schemas up to date for the Go libs.
Given the tradeoffs of reimplementing those libraries and the CI jobs, or wrapping Go code up into jar files for use as Spark UDFs, we did the sane thing!
Spark UDFs
Spark is flexible and you can customize your jobs to handle just about any scenario. One of the most re-useable ways to work with data in Spark is user defined functions (UDFs). These can be called directly inside Spark jobs or Spark SQL. In our case we use a UDF to transform our custom binary input and Protobuf into something that Spark more easily understands: JSON.
This means we can do something like this:
By transforming the data into JSON, we get to work on something that is much more native to Spark than Protobuf. Because we use the libraries that are already maintained with up to date bindings, we don’t have to manage that inside Spark itself. We only have to update our UDF in the jar loaded by the job.
Building a Spark UDF in Go
Before we look too hard at anything else we need to get Go code into a wrapper that lets us call it from Java. Then we’ll need to write a little Java glue to wrap the Go code into the right types to make Spark happy when calling it. Luckily, for this use case it’s all pretty simple.
Wrapping Go Code Into a Java Library
First, we need some Go code to wrap. The library we talked about was built for
one purpose, but we need a little wrapper function to do just the part we need
to call from Spark. In our case we’ll take a single value from Spark and return
a single value back. Both of those values will be of type []byte
which makes
things really simple going between Go and Java, where this type interchanges
easily. I made a file called spark_udf.go
that looks like this:
Now that we have that, we need to get it into a jar file with all the bindings
that will let Java call into the Go function. As part of the gomobile
efforts
to integrate Go with Android, there are some tools that will handle two way
wrappers between Java and Go.
gobind
will generate Java
bindings for exported functions in a Go module. You could start there and build
up the necessary tools to make this work. Sadly, it’s not at all trivial to get
it into a shape that will build nicely.
After messing around with it for awhile, I found a tool called
gojava
that wraps all the hard parts
from gobind into a single, easy to use tool. It’s not perfect: it does not
appear to be under active development, and does not support Go modules. But, it
makes life so much easier, and because none of this stuff changes that often,
the lack of active development isn’t much of a hindrance here. The ease of use
makes it worth it for us. Getting a working Java jar file is a single step:
This will generate a file called eventdecoder.jar
that contains your Go code
and the Java wrappers to call it. Great, right? If you are using Go modules,
just use go mod vendor
before running gojava
to make sure that you have all
your dependencies in a form that gojava
can handle.
Adding the Right Interface
But we are not done yet. The Go code in the jar we built does not have the
right interface for a Spark UDF. So we need a little code to wrap it. You could
do this in Scala, but for us Java is more accessible, so I used that. Spark
UDFs can take up to 22 arguments and there are different interfaces defined for
each set of arguments, named UDF1
through UDF22
. In our case we only want
one input: the raw binary. So that means we’ll use the UDF1
interface. Here’s
what the Java wrapper looks like:
We stick that in our path following the expected Java layout. So if
spark_udf.go
is in the current directory, below it we put the above files in
java/com/community/EventsUDF/Decoder.java
. Note that this needs to match your
package name inside the Java source file.
Assembling It
We’re almost there! But, we need the Spark jar files that we’ll compile this
against. Our project has a Makefile
(which I’ll share at the end of this
post) that downloads the correct jars and sticks them in ./spark_jars
. With
those being present, we can compile the Java code:
With that compiled we can just add it to our Jar file and we’re ready to go!
That will update the jar file qnd insert our class. You can load that up into Spark and call it as shown at the start of this post.
Important Notes
You need to build this for the architecture where you will run your Spark job!
Spark may be in Scala, and we may have wrapped this stuff up into a Java jar,
but the code inside is still native binary. You may have some success with
GOOS
and GOARCH
settings, but it has not been repeatable for us with
gojava
. We’ve built our UDFs that we use on Databricks’ platform under Ubuntu
18.04 with Go 1.14 and they work great.
Another gotcha that Aki Colović found when working with this stuff, is that your Go package cannot contain an underscore or it makes the Java class loader unhappy. So don’t do that.
A Makefile to Make it Simpler
There were a few finicky steps above and this needs to be repeatable. So, I
built a Makefile
to facilitate these UDF builds. It does all the important
steps in one go. As you will see below, we have cached the right versions of
the Spark jars in S3, but you can pull them from wherever you like. If you are
into Java build tools, feel free to use those.
The Makefile
I wrote looks like this. You will likely have to make little
changes to make it work for you.
Conclusion
What may seem like a slightly crazy idea turned out to be super useful and we’ve now built a few UDFs based on this original. Re-using Go libraries in your Spark UDF is not only possible, it can be pretty productive. Give it a shot if it makes sense for you.
Credits
The original UDF project at Community was done by Alec Rubin and me, while improvements were made by Aki Colović, and Tom Patterer.