Data Microservices in Apache Spark using Apache Arrow Flight

Created with glancer

hi everyone thanks for coming to the webinar my name is Ryan Murray I've been working at jamia for about a year and I'm working there as the developer doing some open source projects and some research and that kind of stuff so today I wanted to share with you one of the cooler things that we've been doing lately which is our work on a jierou flight and in particular for this talk
some of the stuff with a smart connector and how that can apply to sort of building data my data micro services so for those of you who aren't aware I just thought I'd level set on what Apache arrow is get everyone on the same page before we go any further so everyone's really become sort of the the industry standard for in-memory data I'm not sure how many people have heard about it but as you can see it's it's in tons of different applications so it's used in
spark and indra mew and videos doing some interesting stuff with it on GPUs it's kind of spreading all over the place and that's a that's really by design one of the goals that we had when we first formed spark as an SRE arrow as a community was to make it a lingua franca for data the idea that if you store data in arrow format you'll get to in a few minutes you can there's all kinds of tools which you can leverage to
kinds of tools which you can leverage to do calculations on that data and there's understood and clear and standardized ways to move data between processes and between applications and machines Monon so since the first release of arrow back in December so 2016 you can see the grow has been growing exponentially every every month there's there's even more downloads part of that is the broad language support so you can see there's
over coming close to half a dozen languages now that it's implemented in some of these are sort of using the C libraries and others are native implementations these libraries that really helps out with a lingua franca part of it is every every programming language is sort of speaking the same language when it comes to data and as I said the the community's very active there's a like over 300 developers when we're doing a lot of interesting stuff making arrow
work on CPUs GPUs and more recently even a few genes so what is arrow well simply it's a in memory specification for data it tells you how to layout memory layout your data in memory in a binary format that makes it extremely efficient for analytical workloads large analytic Oracle's and that's irrespective of if you're on CPUs GPUs or or more exotic
things besides from that is a set of tools so you have the standard and then we've built a lot of tools in the community to help you manipulate that the data in that standard so you can think of that as you know sort of Lego bricks for building novel data applications some examples of this are yo getting data into and out of arrow from various formats whether it's a grow or Park a or something like that and
other things are compute kernels or engines which help you do calculations on the arrow even to things like flag which is a RPC mechanism or other other ways of trading data with other applications or processes it's important to us say what what arrow isn't and it isn't an installable system as such you can't go and download a copy of Harrow like you would spark and run it whereas it's a it's a library sparky uses arrow
to to be efficient with Colin or Dena nor is it a memory grade or in-memory database or something like that well you can build these sorts of things with with arrow it's it doesn't have any of these things in it on its own it's more a set of primitives and finally it should it's important to mention it's not really designed for streaming records so you're not sending single row arrow batches Reb you tend to have dozens or hundreds or thousands or millions of rows no record batch
and that's mostly to to be efficient there's the columnar data structure which will describe in a second is designed for for larger data sets move the overhead for small single operation single record operations is too high to make it useful for a first rule so that's what it is what is what is the format look like and the as I mentioned the important thing here is it's a columnar data format so compared to the traditional memory format down in the
traditional memory format down in the bottom right there you in your traditional format you're going to have every row and your table is going to be a block of contiguous memory so to say you wanted to take the average of session idea the the max of session ID or something like that to do that you're going to have to read row 1 and then you're going to have to skip ahead to row 2 to read the next value of session ID and because the interleave fields
they might be find a variable length and it could be strings or whatever you can't really hop to the next row you have to read all those bytes so you end up reading the entire data set to pull out your your half your single column on that data set and that's really expensive for the CPU so you have to load a lot of stuff into your cache you're constantly breaking your pipelines and lots of range isn't all that kind of stuff and that's really not what were the efficient arrow does something that will look familiar to the
something that will look familiar to the people are familiar with pandas it stores everything in a common or data storage so there you have all your session IDs and one can take this block of memory so now when you want to do calculations on that you simply load that block you can load a good chunk of that block directly on to the CPU cache and do that calculation one go sometimes leveraging 70 single instruction multiple data operations where you're actually doing these calculations and single CPU cycles so you get a huge
efficiency by having this locality of data and it allows you to do all kind of interesting things like I said you can leverage some of the interesting architectural aspects of GPUs there's lots of scatter gather io type of operations that you can do to really perform these operations officially so that's the format what is some of the building blocks so these are the Lego blocks I mentioned before we have a quick survey of some of the four most interesting
ones first off is our arcade readers and ones first off is our arcade readers and writers writers so what these are designed to do is get data from arcade to arrow or from arrow to park a very fast so this is done natively at the C++ level so this is a is an extremely fast operation this is particularly useful because your park' file formats are so similar to your arrow formats so you can sort of think of them as cousins so it becomes very efficient to stream part K into arrow and then when the rest of your data
and then when the rest of your data pipeline is there you're really not having to constantly change data structures and data formats and you're never going to be marshalling data once it's out of arcane material similarly a more fast implementation is the feather implementation if you're familiar with one of the arrow founders West McKee needs work you might have seen a feather that's a good way to get data between R and Python very fast it's meant to be ephemeral it doesn't last very long and it's going it's not it's
very long and it's going it's not it's not a durable storage like you'd expect from parquet but it's extremely fast and it only sort of the compute kernel side you have something called Gandiva and in that case if you have a so you have an R some arbitrary expression maybe some filters from a sequel statement or you're multiplying some columns together and dividing by two that column something like that you can take that expression feed it into Gandiva and Gandy bow spit out on some LLVM byte
Gandy bow spit out on some LLVM byte code of that expression and then LLVM wants just-in-time compile that into the machine machine code for your native operating system so here you're able to generate arbitrary expressions on your arrow data and you really get the speed emission machine code regardless of your starting language so it doesn't matter if you're in Java JavaScript or Ruby or whatever else you're gonna get full machine level
speed on these compute kernels and finally zero Flay and that's that's really why we're here today and that's our newest member to these building blocks and it's our RPC mechanism so what is what is arrow plate simply it's a again it's protocol its high-performance protocol that defines how to move data between two systems and the key here is it's a bulk transfer system and the one of the reasons that
system and the one of the reasons that it's so much faster than other implementations is you take an arrow buffer and write it directly into your network bugger so you don't have to translate your data before writing it onto the network similarly the client is going to receive the air at the arrow data from the network and it's going to materialize directly into an arrow offer so you're not dealing with all the marshaling and the expensive cpu calculations of having to constantly change the format that
your game isn't so that's where the the speed comes from this also is sort of the last piece in our interoperability promise so as I said one of the founding points of arrow is to make a lingua franca for data and what the arrow flight does it allows any system any operating system most any programming language to talk to each other in a in a understood known language we never have
to marshal data change data transform data and it's built up from the ground up to support parallel streams which I'll get to in a few minutes and security so out of the box in only if maybe a dozen lines of Python you can write a flight server that will be SSL encrypted and have security attached to it so some of the some of the features that you expect our bill it's built in
so how does this protocol actually look like well so it's built primarily the underlying format as G RPC so if you're not familiar with T RPC it's a Google's RPC mechanism and the idea there is you define a concrete set of operations that you know how to do and then clients come in and request you to do those operations by supplying you that data or requesting data from using about one of
the core concepts around G RPC is a concept of streams so rather than I give you some data you give me back some data I can open up a stream and continuously feed you data or receive data so what a clay client-server interaction looks like is going to be something like I'm going to send you data here's a batch here's a batch here's a batch here's a batch I'm done that's some of the efficiency in some of the ease of use around G RPC helps us do that and it's it's going to
be really fast we also will interact with a relatively low layer of G RPC which is where we're able to get the zero and copy between the network buffer and the and the in-memory process buffers some of the kind of things that flight can support right now is you can do puts and gets so you can give a server data you can get server data from a server and a recent addition is something bi-directional strange that's
a constant interchange of data and everything is initiated by the client here so the the client is in control of how these transactions happen you're never standing around waiting for a server to contact so this is what got us thinking originally about the concept of micro services so you're going to have say how about a spark instance spark cluster and a gem in a cluster and you want a very large data set from gem
you know into spark for some data science application where you can so you use the flight connector and you can stream in parallel bring back a very large data center then you train your model and you expose that model as a flat endpoint so now a further downstream consumer is able to send you a matrix and get back up prediction vector or something like that so you're able to start getting the
concept of a round of a bunch of small servers doing really concrete things something more esoteric might be you're you're doing a machine learning mixing model so you ask the system for a prediction and then it federates out to a bunch of different machine learning models and since they all know how to talk flight and they're using arrow into the hood they can be you know tensorflow spark you know a Python that doesn't really matter ask all those to train the
the data on their individual models bring all that back mix it together in the upstream microservice before sending it back to the client so you can start breaking down your data pipeline into a bunch of distinct components that can be reused deployed and developed individually so you can start seeing this sort of microservices architecture come out of this out in this data world and for me at least I think this makes the concept of a data machine learning
the concept of a data machine learning pipeline much more palatable to manage so that's kind of the underlying how does how does the parallel streams work well when you ask a server for a data set you're going to ask for a flight information and you're going to get back a set of flight endpoints and those flight endpoints represent your stream so if you collect data from all those
so if you collect data from all those flight endpoints you'll have your entire data set and the flight endpoint is simply a flight ticket which is a token to say you're allowed to get the data set and it identifies a very specific portion of a specific data set and an endpoint so you know where to get the data from so when you get back this set of flight endpoints you're then able to to break those up any way you want if you get ten
back from your flight server you can redeem flight tickets one at a time in serial if you want or you can do them in a Big Bang so you can spread them across a parallel system either in memory for a single single process or you can send that across a bunch of spark executives and you're really just linearly multiplying you're on your ability to move data down and right now the the way
you get data from spark air from flight excuse me is either sort of a dotted namespace so you can say lists flights and that'll say all of the namespace separated data sets that that flight server knows about or you can even send it a sequel query so you can send a full sequel query the downstream server will execute that sequel query and sending the results back so more concretely that's sort of the parallel system will look more like this if you have a
dreaming a cluster and you're say a Python client the Python client will issue a get flight info to the to the coordinator to the German coordinator which will return back a set of end points and then the client knows to go to each of those end points with their specific tickets to get the data and in this case it'll go to a bunch of Germany executives to get the data so if client was a spark client then you can have your spark executor is facing off against your Germany executor and
off against your Germany executor and you're just getting individual pieces of your data data back into your spark executor so that's flight now let's talk about what the spark source looks like first our spark source is built off of the relatively native data source version two that came out I guess a couple of smart versions ago it was a completely rewrite on the data source
completely rewrite on the data source completely rewrite on the data source API API and has a lot of really interesting really exciting features for people building data sources and as big of a change as that was there is even another very large change going to going to spark 3 so a lot of really cool stuff happening for these data sources for us some of the most important stuff is the columnar support so you can do stuff with arrow batches for example transactions which isn't very important
for this example but is really powerful so you can actually perform like acid transactions on your underlying data source and if that's a flight data source talking to a large scale say dromio cluster or something like that then you're able to do some pretty powerful transactions it's also easier to control partitions and how to map data source partitions into smart partitions and better support for questions which is really important for
signal sources so the the spark source for flame what that looks like is so it leverages the Colin or batch as I said before so you're actually pulling arrow offers directly from the flight source into into sparks in internal arrow into into sparks in internal arrow representation representation so the Colin or batch is actually using arrow under the hood it's also fully a supporting push down so if your flight
server is a is able to understand sequel then you can generate generic sequel queries inside of your panda and you design of your spark data frame and the source will translate that down into sequel before sending it off to the spark source currently the data source v2 is only capable of doing push downs for filters and predicates so it doesn't support things like joins or aggregates
support things like joins or aggregates so it does limit how much heavy computation you're able to push down to your downstream sources but hopefully that'll be added in the near future and then that can really you're really able to send Harvard a sequel down here to your flight sources from from your smart data frames and finally for our use case we're going to partition by arrow flight ticket so
the the flight source the flight server is going to generate a bunch of arrow tickets and then those arrow tickets are going to be federated out as partitions and spark so that the the downstream system is actually able to control how how it's going to best partition the data given the queries that's being so so that's that's it for the spark source it's relatively simple there's a link to the github repo at the end of the talk I encourage everyone who's interested to
encourage everyone who's interested to go and give it a try for now let's talk about benchmarks the fun stuff so for this benchmark we worked on AWS we took the most recent version of EMR and faced that off against Jimmy o AWS edition so in both cases we're going to have four nodes for relatively large nodes so relatively equal comparisons of compute power and then for we're going to run a
power and then for we're going to run a bunch of different data sizes for each data size we're going to perform some non-trivial calculation on the spark side and that's to make sure that we use every row and every column and to make sure that spark or dromio are playing games on us when I was first working on this we were spark and jemmye are both relatively smart so they were dropping columns that weren't being countered at all and what that gives us is two times the first time is the time for the
entire calculation the spark gem you know and on the wire calculation and the time in brackets is the on the wire time everything is measured in seconds so just focusing on JDBC versus serial flight you can see significant performance improvements between JDBC and flight immediately on the single single serial use case of play so there is already a good argument for
why the flight can be a good replacement for JDBC ODBC my future and then when we start talking about the parallel stuff we can really see massive performance improvements so we're seeing orders of magnitude improvement or JDBC many several multiple orders of magnitude in some cases for for a point of reference the when we're using eight nodes of both EMR and Remy oh I'm moving a billion
records that totaled something like 80 gigabytes of data and that meant that boil down to something like four gigabits per second of bandwidth on each one of those notes which is actually a significant portion of the total easy to bandwidth available so at this point we're actually moving data pretty much as fast as we can move it on on these modern cloud networks which is really exciting and I think this is what really makes for me
the concept of a of these micro services which are distinct isolated components all talking to each other a real reality so thanks everyone for watching and listening and happy to take any question and and here's some links to geek I you