Introduction and Initial Challenges
00:00:00
Speaker
A few years back, I was working at a startup that had an enormous product catalog that was constantly updated and people needed to search through it. And I got into a debate with a colleague of mine because the data was in MySQL and he wanted to start regularly exporting it out and shipping it across into Elasticsearch because he thought the search capabilities would be better.
00:00:24
Speaker
And I thought that's a great idea in theory, but making that run efficiently and reliably every day was just going to be an ongoing maintenance problem that our startup didn't need.
00:00:37
Speaker
We debated it back and forth and in the end we went ahead and it turns out we were both right. It was painful to set up, it was never quite fast enough to be up to date and it was reliable except on days when it wasn't and those days just took all productivity out of the window. But it was worth it. It was a much better experience. It was painful but it was worth it. Could it have been less painful?
00:01:04
Speaker
When you need, as so often you do, to pull data from these places, do something with it, and stick it into those places. Is there a good generalized solution to that problem?
Introduction to Apache Flink
00:01:15
Speaker
And that's today's topic. We're going to look at Apache Flink, which has been getting a lot of interest in this space. And it's designed to solve exactly that kind of problem. So I've pulled in Robert Metzger, who's on the PMC for Flink. And I'm going to get him to explain what Flink does and how it does it.
00:01:33
Speaker
And in this conversation, we managed to go all the way from why does Flink exist and how did he get involved to how sophisticated can you make the data processing part? What options do you have? What languages can you use?
00:01:47
Speaker
And how do you recover when the process crashes? What's its crash recovery and scalability story? And lots more. So let's talk about Flink. Let's ship some data around. I'm your host,
Guest Introduction - Robert Metzger
00:02:01
Speaker
Chris Jenkins. This is Developer Voices. And today's voice is Robert Metzger.
00:02:18
Speaker
Robert Metzger, thanks for joining us. How are you doing? Good. Thank you. I'm really excited to be here on the podcast to talk about Flink today. Yeah. I'm very excited to learn more about Flink because there's a lot of buzz about it, but I've never really sat down with an expert and really understood what's going on. So you can fill me in from scratch. Yeah, let's do it. Okay. So let's start with your credentials. Why are you a Flink expert? How did you get into it? And what do you do with
Robert's Journey with Flink
00:02:46
Speaker
I try to keep the story as short as possible. So when I was studying in the technical university in Berlin in like 2012, 13 or so, I was working with a bunch of PhD students there that were building a big data system called stratosphere that was about competing with Apache Hadoop. Basically they had like a database background and thought that
00:03:14
Speaker
There are so many findings in 30 years of database research that have not really been considered with Apache Hadoop. They wanted to take these learnings and add them to a big data system.
00:03:31
Speaker
Then a few events happened, so we open sourced this research project, donated it to the Apache Software Foundation. We started fundraising. I became a co-founder of a company called Data Artisans, now it's called Averika, that set out to commercialize Apache Flink as a technology. Back
Transition from Batch to Stream Processing
00:03:55
Speaker
when we started the company, we still intended to build a batch processing framework.
00:04:01
Speaker
We also, it was actually an outside contribution to add a stream processing API on top of the engine. Because the engine, as I mentioned, is based on database principles and databases use this concept called pipeline execution, where multiple operators are running at the same time. And this sounds very much like stream processing, right? Like operators running all the time processing data. So it was actually quite easy to add an API on top
00:04:28
Speaker
Of this engine for stream processing primitives and of course then we added a lot more things to the engine like checkpointing event time and watermark support the state back ends and so on and so forth to make it a let's say real stream processor and
00:04:47
Speaker
And so going back to this contribution, sorry, was that we noticed that there was a lot more interest in our stream processing capabilities compared to the batch processing capabilities that Flink was offering. And also to
Success Stories and Accessibility Efforts
00:05:01
Speaker
some extent, I mean, Apache Spark was rising very quickly at that time as an alternative to Hadoop. And I think they were, let's say, winning the race in the batch processing space.
00:05:16
Speaker
Okay. And this is how I learned Flink and all of that because I was basically working as a student on the staff and started a company around it. I helped building the open source community as part of Apache and we did a successful exit of the company to Alibaba. They are offering Flink in their cloud product.
00:05:40
Speaker
And like one and a half years ago, I switched to decodable to work on making Flink more accessible as part of a stream processing platform, basically, which is also based on Flink. So your entire career has been in Flink. Yes.
Flink's API and System Complexity
00:05:57
Speaker
Are you happy with that? Yeah, but of course, like after, I don't know, 10 years or so almost in the same technology, you start to wonder if you also need to look left and right. But of course, I mean, Flink is a really big project.
00:06:15
Speaker
not as big as the Linux kernel, but there are sub-systems in Flink, completely different APIs and abstractions, and you can go very deep in many areas. So there's a lot of very different areas in Flink to work on very different problems.
00:06:32
Speaker
Yeah. Any large enough project becomes a whole sub world, right? A little universe in itself. Yeah. Okay. So you've raised a lot of things we need to get into, but before we get there, just for context, give me a typical example of how someone uses Flink to solve some problem.
Flink's Real-time Data Processing
00:06:51
Speaker
have some relational database like MySQL and you're noticing that your full-text search queries are taking too much time on that database. So you want to offload these types of queries from your transactional database into something that is more purpose-built, something like Elasticsearch.
00:07:10
Speaker
So you would use Debezium for change data capture to follow the changes on this transactional database, then perform some joins in Flink to combine data from different tables into, let's say, one big document, and then you ingest these documents into Elasticsearch.
00:07:34
Speaker
So basically, you're using, and you're not using Debezium as a standalone system. You could do that and use Kafka in between. But in this case, Debezium would run as part of a Flink connector. So there's connectors in Flink that are based on Debezium that allow you to follow changes on a transactional database. And then you can use, for example, Flink SQL to
00:08:01
Speaker
express this join of multiple tables and filtering, whatever, aggregations, and then you use a Elasticsearch sync in Flink to write this data to Elasticsearch. And in effect, you have something like a continuous real-time join from multiple different tables. And whenever there's an update, you basically instantly get an update in Elasticsearch to always have your data freshen up to date.
00:08:29
Speaker
Okay, so the main thing it's doing for you is, in combination with the Bezium, slurping that data out from one place, a certain amount of processing to transform it as you load it into another place. I think the core
Data Sources, Sinks, and Connectors
00:08:44
Speaker
of Link is the processing.
00:08:45
Speaker
The core really is how fling is able to do this real time join in this particular example. There's many other examples. You can also do machine learning stuff with it. You can do aggregations. But that's the core what fling is mostly about, like real time stream processing. The connectors are more a necessity to get your data in and out of fling.
00:09:09
Speaker
Yeah, it's not going to do much without those. But the essence of the project is... There are other stream processors like Kafka Streams, for example, which are only working with Kafka as a data source, but Flink is independent of data sources. Okay. How many data sources and sinks does it support?
00:09:29
Speaker
So I think the core project has maybe like eight-ish and then there's probably like 20 or 30 that are in other repositories or somehow available publicly.
00:09:44
Speaker
But all the important stuff is covered like Pulsar, Kinesis, Kafka. On the sync side, you can do any JDBC database, you can do writing to S3, I mean any file system actually, you can monitor for changes, you can write
00:10:01
Speaker
to these file systems in various bucketing, like time-based bucketing, database bucketing, whatever. So there's a ton of options for getting your data
Data Flow Graphs and Task Execution
00:10:12
Speaker
in and out. And to be honest, if there is a rare case where you have like a custom system, then it's also not terribly hard to build your own source or sink and fling. Okay. Does it ever get used for like,
00:10:26
Speaker
Is it always transforming the data and sending it to other places or does it ever get used just for straight reporting? I mean, you can use Flink. So what do you mean by reporting? I guess what I mean is could you have like a CSV or a PDF as your sync? CSV for sure that's supported out of the box. So you can write
00:10:54
Speaker
can write like CSV or more popular like parquet files, for example, or you can write in the iceberg formats to S3 or something. So you can definitely use link also for getting your data in the shape that you want, like for loading your data warehouse, for example. Okay.
00:11:14
Speaker
I only asked that because I'm always thinking like what's the smallest possible change I could make to introduce this to a project. Yeah. Okay. In that case, I think we should dig down into how it actually works. What do I need to understand about Flink's architecture? So Flink.
00:11:37
Speaker
has a few basic building blocks that you use to express your workload. So the core is a data flow graph, like a set of operators, let's say a Kafka source and a CDC source, and then a bunch of operators like transformations, filters, aggregations, joins.
00:12:04
Speaker
And they can be arbitrarily complex. Like trust me, they can fill entire screens if you visualize them with like hundreds of operators really. I mean, okay, hundreds is maybe an extreme case, but like 30, 40 operators is really not unheard of. Okay.
Windowing and State Management
00:12:23
Speaker
use one of the APIs in Flink, like the Java or Scala API or the SQL API or the Python API to build this dataflow graph. So basically to define the structure of your Flink job and to define either the logic of predefined operators. So Flink has, for example, windowing operators that allow you to window data into hourly buckets or something.
00:12:51
Speaker
or session windows where you want to analyze like have basically dynamic windows that just have the size of a certain user activity. And so you define the structure and then you also define the logic of what is happening inside these operators. And then the engine knows how to execute this graph efficiently on a cluster of machines.
00:13:17
Speaker
So if your workload is increasing, you can change the number of instances, the parallelism of your job in Flink, and then you will execute this on more machines.
00:13:29
Speaker
Okay. If I, I'm just trying to think how that's going to look. So what do I do if I'm using SQL? Am I doing something like create thing that looks like table, but is actually a slurping from my SQL? Pretty much. Yes. So, and then I can join to that in my from clause later. Exactly. So basically you're defining a stream as like a table in Flink SQL.
00:13:52
Speaker
So when you want to read from a number of Kafka brokers or whatever, like Kafka clusters, you do a create table statement for each of these data sources. So Flink SQL supports all the data, all the connectors that I've mentioned. So you can create, they do like whatever.
00:14:17
Speaker
If you have a Kafka topic called users, you do basically create table users, and then you define the fields in your Kafka topic. And let's say the data is in JSON, so you define like a JSON deserializer, and then Flink knows how to read the JSON data from your Kafka topic to analyze it.
00:14:39
Speaker
So you do like a create table to expose your kafka topic in fling sequel and then if you want to do for example a filter you just do select stuff from. This table and then in the way i close you can define your filter or you are you free from multiple tables and you join.
00:15:02
Speaker
Right. So I'm going to make you flinch again, because I got to mention CSV again. Do I also do like create table as output dot CSV and then insert into output table select star from? Exactly. Yes. It just looks like regular SQL type operations. Yes. But there's no underlying storage. The underlying storage is just outside of Flank.
00:15:30
Speaker
Exactly. Flink doesn't come with any storage. So you define S3 as a storage or local file system or whatever. Okay. Okay. So the next question that raises, you say it will automatically distribute the jobs across nodes. How is that working?
00:15:50
Speaker
And so there's a central compo. So when you're building this data flow representation in your, let's say Java API or in SQL, you're typically doing this on some kind of client instance. So either it's the.
00:16:07
Speaker
SQL shell that you're using or it's a Java application that you're implementing and the Java application knows how to connect to what is called the job manager process of links. So the job manager is the central component that is coordinating the execution of a Flink job or actually also multiple Flink jobs. So Flink supports both single or multiple jobs on a job manager.
00:16:33
Speaker
And the job manager is creating a distributed representation of this Dataflow graph and is
00:16:48
Speaker
Basically splitting out, so if you have Kafka sources, some join operator, and a CSV sync, then it will create multiple instances of these operators. If you're running this on 10 machines, then it will create 10 Kafka sources, 10 join operators, and 10 CSV syncs.
00:17:09
Speaker
And the worker will then get distributed from the job manager to the so-called task manager. So task managers are the workers that are running on all the cluster nodes. And as you add or remove task managers, you can scale up or scale down the resources that are available for processing. And can
Distributed Snapshotting and Consistency
00:17:30
Speaker
you do that dynamically? Can you just throw more machines at it?
00:17:33
Speaker
Yeah, that's a feature that we've added in Flink, I don't know, like one 13 or 14, I don't know, maybe earlier. So it is a feature that has been added to Flink, let's say recently. For a given value of recent. Two years ago or so. That allows you, it's called the reactive mode in Flink and it basically allows you to add or remove machines and Flink will dynamically grow and shrink.
00:17:59
Speaker
Okay. And that's pretty neat if you're using something like Kubernetes horizontal pod auto scalers. So Kubernetes will monitor CPU usage. And if it goes too high, it will just add more machines. You can do the same with EC2 auto scaling groups or something.
00:18:17
Speaker
Okay, that makes sense. And is there, I mean, I'm just trying to figure out this SQL interface. Is this something like the notion of a temporary table? Can I join two things, stick them into one logical table while I think about how I want to select that data out?
00:18:37
Speaker
So you don't have to worry too much about the representation, like how Flink is representing a table or a join in your SQL statement. So Flink will of course internally, so it really depends on what you're doing. So if you do a, let's go back to the example that we had earlier, some user table.
00:19:04
Speaker
and you do like a select star. And let's say you're reading, you're reading from Kafka, you're filtering and you're writing to Kafka. Then it's really, data is just flowing through the system. It's never materialized in the system. It's just passing through and we are filtering out some, like if you do like a insert into target, select star from users where
00:19:32
Speaker
x my smaller ten or whatever then it will filter out all the elements based on that on that predicate okay and but if you but of course if you do something like a.
Event Time Processing and Watermarks
00:19:47
Speaker
and your input. So let's say you have this user table and you want to enrich some information in this user table from a database table. So you have this stream of user data and you have a MySQL database that contains
00:20:05
Speaker
let's say the address of each user. And you could do something like a database query for every user, like a query on MySQL for every user that comes, but of course you would bombard your MySQL server with
00:20:21
Speaker
queries for every incoming record in the stream processor. Your stream processor would actually be slowed down to the speed of your MySQL server. And that's not what you want for a massively parallel system like Flingit. So what you do instead is that you do change data capture on this MySQL table. So you
00:20:43
Speaker
do an initial snapshot of all the data in the MySQL table, load it into Flink state, and then you subscribe to further updates from that table. So whenever a user address is changing, we are updating this data in Flink state, and Flink can then do the lookup locally in its state backend.
00:21:04
Speaker
So you don't have to do any network access for this particular state lookup. It will just be in the local ROCCP instance of the task manager. So that's a lookup join in Flink SQL.
00:21:24
Speaker
That's where Flink SQL will decide that one side of the join, the side with the address information from MySQL needs to be stored in Flink state, but the user data that is flowing will not be persisted. But there are cases where you're joining two tables and you're materializing both sides of the join. And whenever any of the two inputs are updating, you produce an output event that will send downstream and then the sink has to decide what to do with it.
00:21:55
Speaker
Okay, that that leads naturally to two questions. One's about time and windowing. But first, you're saying that, like for the address thing, you're maintaining what is essentially a local read only replica of my sequels address? Yes, right. How is that state management working in Flink? Because you said Flink doesn't do any storage.
00:22:18
Speaker
So what's going on it is okay i was lying to you let's just say it was for the sake of making it easy to understand and we can we can separate between logical and physical storage and so.
00:22:34
Speaker
The project itself doesn't provide any permanent storage like a MySQL server to persist your data in your database tables. But Flink needs a lot of temporary storage and that's what we generally call state.
00:22:52
Speaker
So state can be as small and simple as the current Kafka offset that you're reading from. And it can be as big as your entire address table from MySQL. So we have customers that are using Flink with like 250 gigs of state.
00:23:12
Speaker
um, in their, in their joints. And even that is small. Like if you talk to Apple or Netflix, which are using fling production, they're really talking about terabytes of state that they're maintaining, um, across their task managers. Okay. And, um, so a lot, a lot of things that we've implemented in fling about, um, the state management.
00:23:35
Speaker
Because I think that's also why I think we say on the website Flink is doing stateful stream processing. So this ability to maintain state inside the operators is what makes Flink really interesting because
00:23:51
Speaker
If you just, without state fling, it would just be a system that allows you to move data from left to right, but it cannot really maintain or build up any knowledge about the outside world.
00:24:07
Speaker
So we think you can you can basically you have like memory you know what happened before and you can make some decisions based on what happened before i get them in the case of this look up join you know what is in the my secret table and you know that i'm.
00:24:25
Speaker
Flink is always making sure that you're not losing any data. So even if you kill one of those task managers or multiple of these task managers, Flink is able to recover this state and restore it for you. And you will never notice that there was any loss of state temporarily because the task manager has died. Okay. This presumably is also the same mechanism you use to calculate aggregates that are live updating, right?
00:24:51
Speaker
Exactly. So you can store aggregates in state. Yes. We should probably talk a little bit about the difference here between
00:25:04
Speaker
static data and live data, because you say Flink was originally built for like batch files. But you can, just to give it a really simple example, I can say something like, count star of users. And I would expect that count to be gradually rising as people register on my site. So you have, so what's, what's Flink's notion of a live stream of data, if everything's a create table statement?
00:25:36
Speaker
Or is all data in, even the stuff that looks static, is all data in Flink considered just a stream of data coming in? From an entry perspective, yes. So even if you are reading, like if you're doing a classical batch use case, you have, I don't know, a few terabytes of data in your S3 and you want to join it and filter some data and then write it to Snowflake.
00:26:00
Speaker
And this is a batch shop. This is a finite batch shop. You are starting and Flink will execute this in a streaming fashion. So it will just start reading the files in S3, stream them through the join operators and whatever other operators you have, and then load them into Snowflake. And once it's done reading the files, it will shut down and the job will be done.
00:26:25
Speaker
Okay, this is also treating batches a special case of streaming correct. Yes. Yeah. Okay. Okay That keeps things I'm assuming that keeps things simpler, but we should go deeper into how it processes like Like joins, how do you do streaming joins? What's what's flinks notion of that?
00:26:45
Speaker
If you know that you're processing a finite stream, you can do more optimizations than when you're processing an infinite stream.
00:26:58
Speaker
So if you're doing batch processing, you have a finite stream. So you know that you can write all your data on disk. You will be able to, assuming that you have sufficient disk space, you can always assume that you're able to consume the entire dataset, write it to disk, do some operation, and then continue with the processing.
00:27:17
Speaker
In the streaming world, that's not possible. You want low latency. Data will never stop. Your disk will just run full at some point.
00:27:30
Speaker
So if you're doing a batch join and you know that your data set is finite, you can be more efficient because you can load your entire data set into memory or into on the disk and then, for example, sort it on disk and then do like a sort merge join of the data. You cannot do that in streaming because you cannot sort something that never finishes.
00:27:56
Speaker
You have to assume streams are infinite, so you can't possibly sort them. And what we are doing in SQL is that we're using Windows if you need something to be finite. So if you want to chunk your data into daily batches in the stream processor, then you can do a daily window. So you collect data for an entire day, and only after a day you do some analysis based on this one day worth of data.
00:28:26
Speaker
And there's many different types of windows. So you can do what we call tumbling window, where there's discrete windows. For every hour, there's a window. If you do hourly windows, or you can do sliding windows, where every 10 minutes, you're triggering a new window, and the windows are one hour long. So they are interleaved. Then you can do session windows that are dynamic. And you can do any kind of custom window based on your needs. So that's
State Recovery and Minimizing Reprocessing
00:28:56
Speaker
Okay you can custom define i didn't know that yes yes so there so the windowing api and fling so fling has a build in windowing operator that you can customize so i think a typical customization is that.
00:29:13
Speaker
You use a standard window, let's say a sliding window, but you trigger it more frequently. So theoretically, by default, you trigger the window when the hour is completed, if you're doing like an hourly window. But you could do something where you trigger the window every 10 minutes and then have like a final trigger after the full hour.
00:29:35
Speaker
And this way, you can get results faster. So you can basically show your user, this is the data of the current hour. It's in progress. And then at some point, you can tell your user, this is the finished hour. This is the final count or aggregation or whatever I'm doing of my data. And then there's even more complexity to this. And this is about handling of time.
00:30:03
Speaker
Because when you're building a window, you can do windows based on count of your data. You can also say, for every 1000 elements, I want to do some analysis. But I don't think many users do that. Most of the users do windows based on time. And if you're doing that, then you have to cope with the nature of time and out of orderness.
00:30:29
Speaker
So imagine your data is coming from mobile phones and your users are sometimes in the underground or in an airplane or whatever they are on the countryside and there's a spotty reception. So your events will not always, like when the user is liking stuff on Instagram, your likes will not arrive always.
00:30:52
Speaker
Um, when the user is clicking the button, the likes might arrive like five minutes later when the mobile phone has a reception again. Yeah. And if you've got a three minute window, that's a problem. Exactly. So, um, I mean, it depends what you're doing. So Flink has a lot of, um, functionality for handing this exact use case. So, um, if link has a, um,
00:31:21
Speaker
concept that is called event time. So you can tell the engine that in your data, one field is the time at which the event has happened. So imagine your Instagram user that is currently on an airplane is doing likes,
00:31:40
Speaker
whenever you click the like button, you assign the timestamp of the phone, of the user to this event. Now the user is landing two hours later and these events will arrive with a two hour delay. But the event time is the time when the event has happened, not the time when the data is arriving in Flink. And then you can, if this window is still open, like still available, you can assign this to the right window.
00:32:08
Speaker
Or you have some custom logic that is handling late events. Like events that are arriving out of order. So we're saying if you had a window that was a day long, then this is a non-issue. But if it's an hour long, you've already closed that window, shipped it off and said those are the results, right? So how do you backfill that with the old data?
00:32:32
Speaker
And so that's something that Flink cannot solve. It gives you the means to solve it, but it doesn't solve it for you in a magic way. So this is something that you have to solve as the author of Flink Shop. So the window operator has multiple approaches. Let's say the one approach is that you have a site output, like a special stream, where late events are sent to.
00:33:00
Speaker
Then you run some custom code that is handling these late events. So imagine you're inserting these hourly aggregates of the number of likes, whatever per country or so, into Snowflake. Then you could run a query in this special code that is updating the aggregate.
00:33:23
Speaker
So you know that the count from two hours ago is already closed, but you get this information that there is one last user and you just update the count by one in Snowflake. So your custom code is literally just running update statistics where time slot equals.
00:33:41
Speaker
Yeah, because I mean, the problem, the trade of when working with out of order events or like late arrivals is always latency. So theoretically, you could keep your hourly windows, and this is actually what happens in Flink. So if you do hourly windows with event time in Flink, then Flink will keep multiple of these windows in its local state.
00:34:08
Speaker
And because it's basically collecting data for multiple hours at the same time. And only every now and then Flink decides that a window is done and it's closed and then it's doing its computation on the window data and sends the result downstream.
00:34:29
Speaker
Okay. So there comes a point where you say to the user, either you have to figure out a special way to handle this, or we can just drop it on the floor and say it's too late. Yeah. Okay. Yes. And this is all configurable by the user. So you can configure how long after a window has been closed, you want to still keep it in case of late arrivals.
00:34:56
Speaker
And you can also define how Flink decides when a window is ready to be closed. Because that's another set of issues. If you're building hourly windows, how does Flink know when the hour is complete when it's working on event time? And for that, it's using a concept called watermarks.
00:35:22
Speaker
Um, like low watermarks and, um, every source in fling is emitting these watermark events into the stream. And, um, these are special records that you cannot immediately see when you're implementing your dataflow, but, um, they allow you to track event time across your operators. So, um,
00:35:51
Speaker
let's say in the Instagram-like example with event time, at some point, the source that is, where you're consuming these events from in Flink, like these like events from a Kafka topic, will decide how far the time has progressed. So it will basically say,
00:36:20
Speaker
Based on the data that I've seen so far, I think that 2 p.m. has passed now.
00:36:26
Speaker
Right. Yeah. I've seen at least one event from 2 p.m. So it must be later than that. Exactly. This is one approach. So you can say you basically just follow the highest time you've seen. And that's basically
Managing Checkpoints and Savepoints
00:36:39
Speaker
your virtual clock. What people typically do is that you add some lag to this. So you say, I saw an event for 2 p.m., but I'm actually assuming I'm just reducing like half an hour from this.
00:36:54
Speaker
subtracting half an hour from this. So that the event time is always lagging a bit behind, like for half an hour it's lagging behind, so that you can account for late arrivals. So if you know that you're willing and able to tolerate up to half an hour for late events, then you define your watermark so that it is always trailing for half an hour of the event time that you're tracking.
00:37:18
Speaker
This has reminded me of meetups that say they're going to start at seven, but they have to start at seven 15 because no one shows up till seven. Yes, exactly. Like everybody who has ever hosted a party at home knows that you cannot start. Yeah.
00:37:33
Speaker
And so another approach that you can do here, for example, that you build like a histogram of the distribution of how late events are arriving. And then you say, I want my watermarks to be covering 95% of the events.
00:37:54
Speaker
or 99% of the events or whatever, and you're willing to tolerate like 1% loss, or you're willing to handle like 1% of late events, um, using some custom logic. Right. Yeah. Okay. So that leads naturally into, um, if that's how you deal with user space being unexpected in its, in its reliability.
00:38:21
Speaker
How do you deal with machine space being unreliable? What happens if I'm Netflix and my 250 gigabytes of state crash and die? What's going on internally there?
00:38:38
Speaker
Let's say the 250 gigabytes of state that Netflix is maintaining are actually one hour windows. To stick to the example. You're so big, because you're Netflix, that your state is 250 gigabytes. If any machine is failing,
00:38:58
Speaker
Flink will restore the state from the latest checkpoint. So Flink is periodically creating checkpoints that contain the state of all operators in your Dataflow graph.
00:39:22
Speaker
Let's say we have Kafka source, the state in the Kafka source is the names of all the partitions and topics you're subscribed to and the offset of the latest message that you've read.
00:39:35
Speaker
And then you have this window operator. And there, it's all the data that is in the windows. So it's either some aggregate per hour, or it's all the data from the hour. And then there is a sink. Let's say again Kafka sink. This might also store some state if you're using exactly once Kafka sink. And if you configure a checkpointing interval,
00:40:02
Speaker
Which means at this interval, Flink is making a backup of the state to some reliable storage outside of Flink. Typically these days it's something like S3 or whatever your cloud provider is offering, like a cheap durable storage where you're uploading your checkpoints to.
00:40:26
Speaker
In our example, we could configure something like checkpoint every five minutes. Every five minutes, upload the state of all the operators in particular, these 250 gigabytes of data from the window operator to S3.
00:40:49
Speaker
Okay. Does that mean it's snapshotting 250 gigabytes every five minutes? Or is it incremental? Is it clever in that? Yeah, it's clever. So it supports increment, it supports incremental checkpoints, um, that only contain the diff from the last checkpoint. Okay. And if that's, so what happens when that goes wrong? If I've got one of your 30 operator pipelines, maybe running across 60 machines and one of them in the middle dies.
00:41:21
Speaker
I'm just trying to think how you coordinate all those different snapshots so that you get everything and don't reprocess something twice. You will reprocess data twice in case of a failure, but you won't count data twice. You won't have duplicates. This is the reason why Flink is considered or
00:41:47
Speaker
I call it exactly once stream processing framework. So it is exactly once with respect to state. It's not, yeah, let's say it's exactly once with respect to state, but it might still process data multiple times. So, if a single operator in your Flink pipeline fails, it will reset the entire pipeline to the last successful checkpoint.
00:42:13
Speaker
There is an optimization, it's called local recovery in Flink, that only on the failed machine you have to redownload the state. On the other machines you can just reuse the files that are still on your local file system. So the checkpointing files are available on your local disk, so the recovery is not as painful as it sounds like. You don't need to redownload 250 gigs.
00:42:36
Speaker
I don't know, you're downloading like 15 gigs or whatever of that failed machine and then you can continue processing. So the trick, how we achieve exactly one semantics is the way we create these checkpoints.
Exploring Flink's APIs
00:42:54
Speaker
these checkpoints using a distributed snapshotting mechanism so we are not just stupidly uploading the current whatever data we have on each operator and this message like when we are uploading might be off right so you might have some messages that are in between and process multiple times whatever we can guarantee that this checkpoint is consistent across the data so how this works is that we are
00:43:21
Speaker
inserting a special event into the stream that is called the checkpoint barrier. So if you have the Kafka source, window operator and sync, then it triggers on all the source instances via an RPC message. It just says, now do a checkpoint. So the source will upload its state to S3 and then it will
00:43:44
Speaker
So the source will get this message. It will stop processing. It will upload its state. Then it will emit a special record downstream that says, here's the checkpoint. And then it will continue processing. So then in the output of the source, imagine a queue. And there's regular events. Then there is a checkpoint event. And then there's regular events again. And this checkpoint event will travel downstream to the join operator. Now the join operator.
00:44:13
Speaker
And we'll have multiple inputs, right? You have this as a distributed graph. And it has multiple sources. And the join operator will have to wait. So it will receive a checkpoint barrier on one of the input channels. And then it will stop accepting new data from this input channel. And it waits until it has received a checkpoint barrier from all the input channels. This is called the alignment phase of a checkpoint. And once it has received a barrier from all the inputs,
00:44:43
Speaker
It will create again a copy of the state of this operator. In this case, it will upload 250 gigabytes of state.
00:44:51
Speaker
And then the barrier is traveling through the system downstream to the next operator, which will repeat this until we reach the sink. And once we've backed up the state of the sink, the checkpoint is conceptually closed. We have created a consistent copy of all our state at this particular point in time. And records are not allowed to overtake these checkpoint barriers.
00:45:15
Speaker
And because of that, even if we have a failure now, we reset the Kafka sources to their offset that was there at the time of the checkpoint and the contents of the window operator also to exactly the same point in time. So we will reset the state and the data. And this is in line. This is how we can guarantee that we are not counting twice. So if your window operator is counting the number of records, we reset the counter to the right count
00:45:43
Speaker
at the time of this particular offset of the Kafka sources. So we replay the data from Kafka and the counter will go back a little bit and then we will recount and continue processing after we recovered from the failure. Right. So if I'm further down the stream in the operator pipeline,
00:46:03
Speaker
I'm going to get, hey, it's time to do checkpoint number three and I do checkpoint number three and I get some more rows. And then presumably there's another message that says, hey, I crashed. We're all going back to checkpoint three and I'm going to resend you rows from that point. Yes. Yeah. Okay. Yeah. Okay. I can see how that makes the whole system consistent with respect to checkpoints.
00:46:26
Speaker
And yes, so these checkpoints are done asynchronously. So these 250 gigabytes of state in our window are of course our problem child that we need to give special treatment. And what Flink is doing for the special treatment is that it will
00:46:42
Speaker
And when you're using actually some in the rock cb on disk state back and and also in the memory base stack state back and it will create only a let's say snapshot of the data like a few of the data at that point in time and it will continue with the processing and upload the data to s3 in the background.
00:47:02
Speaker
So the barrier can actually flow through the system quite quickly. It is only necessary for making sure that we are creating these state snapshots at the right point in time. And then in the background, there will be upload processes sending the files to S3. And only when the barrier has reached all the things,
00:47:23
Speaker
We need to wait that also all the asynchronous uploads that are happening are also finished and only then the checkpoint is considered complete and then we can use it for recovery. So if we are failing at any point in time, we just always go back to the last successfully completed checkpoint.
00:47:40
Speaker
So at that point you could nuke the cluster, reboot it and expect to get back to where you were. Exactly. Yes. Okay. And so this is the mechanism that Flink is using internally for recovery, but there's also user managed checkpoints and they are called safe points. So you can also as a user say,
00:48:02
Speaker
You basically do like a user CLI or the REST API or whatever to trigger a save point. So you can also as a user say, hey, create me a copy of my state right now and write it to this directory. And then you can basically trigger a save point.
00:48:20
Speaker
like triggering a checkpoint out of band. So you're creating a special copy of your state and store it somewhere. And this allows you to go back in time. So if you create these save points like once a day and you realize that there was a bug in my code, then you can go back and say, okay, I'm replaying the save point from three days ago that fixes the bug. And then with event time, you will actually get exactly the same result.
00:48:51
Speaker
It's extreme processing, it's supposed to be real time, but you can use it also for historical reprocessing or backfill. Go back to that point in time. So presumably this gets used a lot when people are, just before people are deploying on a Friday afternoon.
00:49:08
Speaker
Yeah, for this, exactly for this, it's useful. It's also useful if you want to migrate your state from one cluster to another. So imagine, so I don't know. Yeah, you just need to change your infrastructure for whatever reason. You can create a save point and restore the save point on an entirely different hardware.
00:49:27
Speaker
Or if you want to change the Flink version, like there's a new bug fix release, you create a save point and then you restore with the fixed Flink version. And you can even use that to fix your job. So if you have a bug in your code and you want to fix it, you can restore with a different save point.
00:49:48
Speaker
Okay. So you do the whole thing along with a job. Is job the right terminology? Yes. We try to change the name to application, like stream application, but throughout the code and the documentation and in people's mind. So it's both job or application. Right. Okay.
00:50:08
Speaker
Okay. So we're getting into what the developer experience is like for this, then maintaining it. But it's mostly been talking about as though this is a SQL only world. Yeah, it's not. Tell me about the hierarchy of ways I can interact with Flink. So there is two big APIs, the SQL API and the
00:50:37
Speaker
DataStream, Java, Scala, Kotlin, whatever JVM API. And the common ground, I would say, is a DataStream API that allows you to do high-level and low-level operations.
00:51:01
Speaker
define your sources in Java. So you just define like a Kafka source and an Elysium source and a Pulsar source and then you define like a window operator and then you sync in Java and you can like run your own code of course in like the maps and filters and so on.
00:51:21
Speaker
You can go one level down from that and use what we call the process function in Flink. And the process function allows you to access state directly. So you can say, I have a list state or a map state or value state. So that's basically like a Java field in your code. But this field is backed by the state backend.
00:51:45
Speaker
So you can store terabytes of data in this field, even if your machine only has like 16 gigs of RAM or so. Because the data that you put into this list, for example, is actually written into RocksDB. And RocksDB means it's stored in a special file format on your local SSD, ideally. And if you're putting this into this state abstraction in the Java API, Flink will also checkpoint your state. So it will be recovered and it will be exactly once.
00:52:15
Speaker
like the mechanism that I've described. So the high-level window operator that is available in a Datastream API is also using these state primitives. And you can access these state primitives in what we call the process function in Flick.
00:52:28
Speaker
And another benefit of this process function, like this low level process function, is that you can access also time. So you can access the event time field of every record, like the current timestamp, and you can register timer callbacks. So you can say, wake me up in 30 minutes.
00:52:53
Speaker
And then I want to close my window or do some kind of whatever. So you can define callbacks. And then when the time has arrived for this callback, you get a special method call and you can do stuff like closing your window, for example. And this timer is either based on what we call processing time, like the world clock real world time, or it's based on event time. So even if you're saying whatever 30 minutes for this timer, it might
00:53:22
Speaker
The system decides when to execute this time. Okay so i can do something like i'm thinking if i modeled an auction. I can accept bids in but when i get a message saying the auction has now closed then all future bids become irrelevant the window closes at that point.
00:53:42
Speaker
Something like that. And this, like it's a custom bundling logic. You don't have, you can implement this yourself with the process function. You have all the ingredients you need. You have access to the time of the bits and you have the state, like list state to store your bits in Flink state. So this
Getting Started with Flink
00:54:01
Speaker
is what people generally do, that they write SQL until they hit something the SQL API doesn't support and then they break out Java or Scala.
00:54:14
Speaker
It depends. Let me finish a thought about the API. DataStream is like the common API process functions, the low-level API and SQL is of course the high-level API. You can use the SQL API within the Java program as well. You can basically say you create your sources in Java and then you just put a SQL string to do whatever you want and then you continue in Java. You can also mix and match
00:54:39
Speaker
and the different APIs or you can go full SQL just use the SQL shell and you create your data sources and queries and whatever everything in SQL you don't have to touch Java at all. So now there's also extension points in SQL like you can do user defined functions.
00:54:58
Speaker
So besides the built-in functions like concat and whatever, average, you can also do, you can define your own functions and then you implement them in Java or you can even use Java to delegate to Python or whatever language you want.
00:55:15
Speaker
So you can extend Fling SQL. So what I see as a pattern for companies building stream processing platforms is that they offer, let's say, business-specific user-defined functions as part of their streaming platform. Okay. So there's a department responsible for shipping a library of things they'd find, everyone would find useful in SQL, that kind of thing. Yeah. Yeah. Okay.
00:55:44
Speaker
Where do people tend to start with Flink? I mean, what's a typical way it gets introduced to a company? So I think that, I mean, it depends a bit on, um,
00:56:01
Speaker
on your skill set, like whether you're a Java developer or whether you're a SQL developer. And whether you're a lazy Java developer who knows that you can solve a problem in SQL, that's totally fine and makes a ton of sense. Like why? And honestly, I mean, SQL is highly optimized, of course, right? So there's like an optimizer that knows how to do an efficient data flow.
00:56:24
Speaker
and it uses optimizes optimized data types and it uses code generation and all kinds of tricks to make this really efficient so you would would spend a lot of time expressing what you can express in like 15 minutes of trial and error with a sequel you would probably spend like a day or so in java to make this really efficient so it makes a ton of sense for many use cases to use sql and you can totally use flink
00:56:48
Speaker
without knowing Java. You just define your data sources and things and write your queries, and you can do quite a lot with Flink SQL. So one way of getting started with Flink, if you just want to do SQL, is go to the Flink documentation and check out how to use the SQL client. Like you start Flink locally, like you just run a bash script that brings up a job manager and a task manager instance,
00:57:15
Speaker
And then you start your SQL client, it connects to the job manager, and then you can write your queries with it. If your Java developer
00:57:26
Speaker
There's a different path that I would recommend, and that is you create a Java project, you add the Flink dependencies to your project, and then in your main method, you define this execution environment of Flink and trigger the execution there in your local IDE locally. So it will bring up the same components that you're running on a big
00:57:54
Speaker
100 machines distributed cluster in your local machine. So you can really run exactly the same code that you're running on the big cluster also locally. It will bring up
Open-source Community and Engagement
00:58:05
Speaker
a job manager thread and it will bring up a task manager thread basically in your machine. And then... It's not just running like a local client side part of Flink. It's running the whole of Flink.
00:58:19
Speaker
as you wish. You can, of course, use your developer machine to connect to a Flink cluster and submit your job to a big Flink cluster. But for development, if you're just getting started, you're not that sophisticated. You just go and press the Run button in your IDE and it will run everything for you. You can bring up the Flink web UI to investigate what's going on. You can configure checkpointing, you can configure the
00:58:45
Speaker
Roxiby state back and like all of that stuff also runs locally and if you're a developer that's my recommended way of getting started it feels like developing against any other library you just play with the code and add like lock statements and.
00:59:03
Speaker
put breakpoints for your debugger and it's a pretty neat development experience and you can even do like once you've done once you're done with the implementation of your fling job you basically create like a jar out of it and then you upload it to your fling cluster where it gets executed and you can even do things
00:59:24
Speaker
like profiling on your local machine. So if you notice that there's some performance issue potentially on your cluster, you can run it locally again and attach a profiler to see where you're spending most of your CPU cycles.
00:59:36
Speaker
And another benefit of this, again, advanced use, but you can also use this local execution for testing, for integration tests, for example. So if you want to make sure that you're not breaking your production code, you can run a Flink mini cluster in your unit tests to make sure that you're not breaking anything.
00:59:58
Speaker
Okay. And can I stub out my MySQL database for just a local static file source? Yes, exactly. Yes. So in the Java code, basically every, like there's a abstraction called the data stream and a source is producing a data stream.
01:00:14
Speaker
So you can just put this behind a method, and then the method is producing or creating a stream of whatever you want. So you can just implement a simple data generator in Java that is producing data that looks like the real data. And on production, you replace it with a Kafka source. For SQL, there is a data generator available as a source. So you can specify basically this schema, and from this schema, the data generator detects how to generate data for you.
01:00:44
Speaker
Oh, sweet. Well, I mean, you basically find like an integer field and you say, I want to generate positive numbers or something. Right. Groovy. I have to ask for this. Is it any language I like as long as it's the JVM? Yes. Plus Python. Oh, okay. There's a Python. Yes. So there's, um,
01:01:07
Speaker
a Python API as well for Flink that is fairly close to the SQL API or like SQL abstraction. So like between Flink SQL and the Datastream API is something called the Table API. And that's a Java way of saying SQL basically. So you can do like a select where whatever in Java.
01:01:35
Speaker
And I think that the Python API is similar to that. And of course you can run Python code. So you can also do a transformation with regular Python code or an aggregation as regular Python code in your user defined function. And is it feature parity or does it lag behind the Java version? Or has it diverged?
01:01:59
Speaker
Yeah, I would say it has diverged. Yes. Okay. So I think I'm not 100% sure. I have little experience with it, but, um, it is, um, fairly close to the SQL API, like the table API. Okay. Okay. I need to go and check it out then on which note I just had to flinck.org, right? Uh, flinck.opache.org. So it's part of the patches of a foundation.
01:02:26
Speaker
That saves me asking you what the licensing terms are. Exactly. Robert, thanks very much for taking me through all that. And I will see you again. Cheers. Yeah, I really enjoyed the
Conclusion and Audience Interaction
01:02:36
Speaker
conversation. Thank you. Robert, thanks again. I have to say, Robert's inspired me. Since we recorded that conversation, I've started using Flink in Anger on some analytics stuff that I've been working on. Maybe at some point I'll show you. Maybe we'll do some coding videos for YouTube eventually.
01:02:53
Speaker
Something I'd like to do, time allowing. Another thing I'd like to do, and I think this time will allow, is a deep dive on more Apache projects. We've covered a few of them already on Developer Voices. Long term, I think we'd like to cover a lot more because there are lots of interesting ones. So if there's a particular Apache project you think I should be looking at, please let me know in the comments.
01:03:17
Speaker
On the topic of comments, if you've enjoyed this episode or found it useful, please take a moment to click like or share or rate or review, whichever feedback-ish kind of buttons the platform you're listening to this on offers.
01:03:33
Speaker
Developer Voices is available on YouTube, Spotify, on Apple, all the usual podcasting places. They all have different buttons. Please go and find one. I am looking for your feedback on all of them. In fact, I might have some of that feedback data traveling through Flink just between you and me. So why not give me some more datums to run through?
01:03:55
Speaker
On which note, I think it's time for me to leave you and for both of us to get back to the keyboard. I've been your host, Chris Jenkins. This has been Developer Voices with Robert Metzger. Thank you for listening.