I suppose since flooring and parquet files are built out of blocks, there is some sort of joke here. Not sure what it is though…
I was looking at my previous post on Athena, and wondering about how the cost of doing such analytics is driven very much by the cost of scanning all the data. Someone suggest to me to take a look at ORC files.
Now whilst the possibilities of getting it wrong and having the hordes of Mordor come erupting out of the screen were low, this is not an impossibility. The true limits of what cloudy stuff and terraform can do are as of yet untested… But I digress.
If you think about it, a CSV file is a bloody awful way to store data for processing. Sure it makes sense for humans because thats how we would write data down when we collect it, but it makes little sense for processing it. Most of what we want to process are in columns – if you think about it we tend to want to addd up lists of expenses, or find the smallest value in a list of numbers, or work out a deviation or range, or find all the people in the phonebook called Jones….
A CSV file is not the way to do this, if you think about it. Even in a really simple file like a list of expenses there is a lot of data that is not needed
EmployeeID
Name
PurchaseID
Amount
1
Smith
34324
182.20
2
Jones
35122
23.21
3
Patel
39123
1923.00
1
Smith
39441
84.67
1
Smith
39441
34.22
3
Patel
39124
518.34
4
Singh
43321
2885.23
To total the Purchase amounts up, you would have to read the entire CSV file as it is sequential access – this means that you read and discard perhaps four or five times as much as you need. A CSV cannot be read down as a list of columns, because the data is split by EOL – meaning the file has to be accessed row by row.
If we turned that on its head, if we stored the data in a file by columns, rather than rows we could do this. This sort of format is what ORC files, or another file format called Parquet does.
Parquet files are binary – they are not viewable directly. When you create them, the data is stored in columnar formats, and they are also (usually) compressed using one of several different algorithms which saves space and transfer costs. Getting data into a parquet format though is trivially easy if you have python installed.
Here I have taken one of the large CSV files from my previous post, and loaded it into a python session. It does have to fit in memory, pandas if you recall does need to hold everything in RAM, so there is a limit. However, reading it in, and then writing back out to a Parquet format file is simplicity itself.
The engine and compression are option but would be recommended to include. Snappy is as it says, quick and fast with modest compression. if you use gzip instead, expect about 30% more reduction in size for not a lot longer decompression time
For very large files that will not fit in memory, you can use the polars library (Why polars? Because it’s a bear bigger than a panda bear – library programmer humour I guess…). This has a disk based solution that will stream data off a disk and onto another, converting on the fly using little ram. It does work best if you have one ssd for reading and another for writing, and it will take much longer than a ram based conversion but if it wont fit in ram, theres little choice..
import polars as pl
pl.scan_csv(source=csv_file, schema=DataTypes.DTypes).sink_parquet( path=parquet_file)
Looking at the file sizes that were created shows a marked difference…
The drastic reduction in size means that uploading to an S3 bucket is considerably faster than with a CSV file – and the cost of storage in the bucket is commensurately less as well. The file above was compressed using Snappy, a GZip would net even more savings.
What does Spark think?
Running the data in an Athena Spark notebook shows some interesting results. The code to do this is broadly similar with some differences around column names and syntax. The follow was used in place of the CSV based processing in the previous post.
from pyspark.sql.functions import col
df=spark.read.parquet("s3://131273-rawdata/parquet/")
dfRx = df.select('2').drop_duplicates()
uniqueRx = list( dfRx.select("2").toPandas()['2'] )
dfOutput = df.filter(col("2").isin(uniqueRx))
Data load time is more or less the same, even allowing for the required decompression because all the data still needs loading – each stage ran in 8 seconds, which to read in from an S3 bucket and decompress 13GB of data is quite impressive. The same operation on my laptop, running some fairly quick SSD’s was still nearly 45s
The second step was to select the unique list of rx stations, by means of a Drop_duplicates. This was quickest of all, under a second for both cases.
The third step is where the real savings were shown. For the CSV based solution doing the select below took 127 seconds.
Conversely, the parquet solution meant that the select only had to traverse the column of interest, and simply didn’t need to read the rest of the data. It ran tha following, in 19 seconds.
This is a clear winner, over seven times faster with the corresponding reduction in processing costs on the Spark data processing units. More importantly for the costs was the data scanned – in the first case there was an entire CSV file to read at. Spark charges are $5 per TB scanned, which is $0.005 per GB. Hence the 13.5GB CSV file was 6.5c to read. Compared to the Parquet approach which reported 830MB scanned, for a total of 0.4c Now 6 cents difference is hardly going to break the bank. But remember – thats for Every. Single. Query that you do. It all adds up.
Conclusion
Parquet files are a significant wealth enhancer – you can do a lot more with your budget or conversly, you can shrink your running costs significantly just by moving to them from CSV files. Something to seriously consider.
I have a data processing problem – I have too much of it. Far far too much… This is a post about exploring data, and how to process a lot of it, using Athena Notebooks which was a fun delve into a world I’d never really tried that much before.
However, to get there, there is a certain amount of setting the scene which is needed. I must ask you to come on a journey of discovery..
The Problem
A project I follow with a certain amount of geeky interest is the WSPRnet which lurks at https://www.wsprnet.org/drupal/wsprnet/map Essentially the idea is that a small number of very low power radio transmitters send a beacon which other small radio transmitters listen for. The results are collated on a web site and people can download the contact list.
The problem is that it has become, well, popular. Very popular. The filesizes in 2009 a year after it started were on the order of about 50MB a month. Come 2024, and we are knocking on the door of 15GB a month.
The other problem is that people are not using the system as it was designed, and it’s very difficult to fix the issue now. So the data we have is polluted… the old Big Data problem of Buggy Data… it needs cleaning up.
If we look at a typical dump of some of these CSV files we can see the structure
The first column is the spotID which is a unique entry into the database, the second is the UNIX timestamp. The third column is the callsign of the amateur radio station that is receiving and they are the ones that report the “spot” and upload it to the database. The seventh field is the callsign of the transmitting station.
All good. Apart from the slight issue that sometimes, people don’t actually put in the transmitting station and instead insert a message…
Here, we have a station that does nothing but receive – the SWL means that are a shortwave listening station. They have reported the transmitting station, AJ8S correctly though. Worse however, are stations that place incorrect details into the receiving callsign field such as a location
One solution is to clean the data by weeding out the incorrect transmit and receive callsigns. If we assume that any station that transmits also recevies, we can list all the receive station callsigns and then use that to pick only those stations that are also transmitting stations. In other words, we assume that all stations will be both transmitters and receivers and filter out the nonsensical stuff on either side.
This is easy to do with SQL – it’s bread and butter for a database. We simply load all the CSV files into a table, select a distinct list of recive stations and then use that to select those stations that are also transmitting…
LOAD DATA INFILE '/var/lib/load/wsprspots-2024-12.csv' INTO TABLE spots2
insert into prefix select distinct(rx) from spots2;
insert into spots select spots2.* from spots2 inner join prefix on spots2.tx=prefix.pfx;
We load the datafile into spots2, the SELECT DISTINCT makes sure we have a list of all the receiving stations, with only one in the list, and then we use that to select the final list into SPOTS using the standard inner join.
The only problem? It takes…. forever…..
It is fast, assuming you have enough memory. However, as the size of the monthly files increases, the speed falls dramatically. My database server at home has 128GB of ram, 32 CPU’s and a lot of fast disk – it is entirely memory bound. As soon as the work set gets too large for the memory it slows down considerably.
2009-01 started 03:54:06
2009-02 started 03:56:34
2009-03 started 03:59:11
2009-04 started 04:03:03
2009-05 started 04:07:42
.
.
2024-07 started 12:33:38
2024-08 started 13:54:58
2024-09 started 15:09:49
2024-10 started 16:37:41
2024-11 started 18:25:49
2024-12 started 20:14:00
It takes minutes for the first months from 2009 to run, and many hours for the latter ones. Clearly, a better way would be preferable.
Panda bears…
Modern data processing and analytics runs on Python, using a framework called Pandas. If you ever used the language R, this is easy enough to understand and isn’t much more difficult if you never have used R.
Here, we import the necessary libraries, Panda and Pyarrow and then create a dataframe called dfRaw to hold the raw data. This is loaded in from disk. If you are using Windows, don’t put backslashs into the path or it gets messy. Python will happily take “normal” *nix forward slashes in the path and not get confused. Loading the dataframe into memory takes a couple of seconds.
We can examine the size with the shape property and see it is about half a million records and 14 columns
>>> dfRaw.shape
(583455, 14)
Now, we pick a list of unique rx callsigns. Pandas has a convenient method to do this..
>>> dfUniqueRx = dfRaw[2].unique()
It really is that simple. We can look at the list of the callsigns by calling print…
So we now have the raw data loaded in and another dataframe holding a single column of data which is the unique callsign list. We can now use some of the powerful querying and filtering that Pandas has.
A single line, that needs a little unpicking. We are going to load the results into a new dataframe called dfCleaned. We do this by taking our raw data in dfRaw and specify that on column 6 – that is the seventh column (because we start counting from zero in Python) – we want to only pick values that are in the list of unique callsigns.
loc[6] selects the column to look at, and we apply the .isin filter and supply the list of callsigns from dfUniqueRx. It is that simple.
>>> dfCleaned.shape
(552314, 14)
>>>
We can see the end result has shrunk from 583455 to 552314 rows, so we discarded 31,141 rows.
The end results looks like what we would expect, and we can dump this dataframe out to a CSV file.
The entire process takes seconds, and is far faster than the disk based process of a SQL database. When running on my laptop, there was less than a second processing on each stage after hitting Enter…
So… whats the problem?
Memory.
All is good until you run out of memory. At this point, pandas will start throwing exceptions because you have to fit it into memory…. and if it’s bigger than system ram, it wont play ball. Note that the process above, which has a raw data frame and a cleaned one, means you are using memory inefficiently because you are holding the data twice over.
With the 2009 datasets, the half million rows are only a few MB in total. Looking at the later data from late 2024, we have raw datasizes of 16GB. On laptop with 32GB of ram, this won’t fit twice over and the system runs out of space.
The solution, is a distributed computing solution that specialises in rally big data… and fortunately there is one called Apache Spark. The problem, is that Spark needs a lot of work to set up, and to use and it needs a lot of understanding. Spark worklopads will scale across multiple compute nodes and it will distribute the compute requirements beyond what any one node can provide. Sounds ideal….
Spark of life
Amazon Web Services finally enters the discussion – so I hope you all stuck with me. AWS has a service called Athena, and AWS Glue which provide serverless analytics for processing big data. You simply load in your data via an S3 bucket, and then let Athena do the heavy lifting.
Athena has two main ways of processing data. One is via SQL queries, which are available in all regions where you can get Athena as far as I know. The Spark interface however is NOT available in all AWS regions – it is not available in London, eu-west-2 for example just yet, and you will need to use Dublin eu-west-1 to make use of this. Something to consider if data soverienty is a thing for you…
First things first – lets get the data into the bucket. I created a bucket called 131273-rawdata in the AWS console making sure this was in the eu-west-1 region.
Getting data in is easy – just use the aws cli, which is in my opinion a wonderfully underrated interface. It does have a few quirks about wildcards though.. Copying a single file is easy.
D:\python>aws s3 cp wsprspots-2009-01.csv s3://131273-rawdata/wspr
upload: .\wsprspots-2009-01.csv to s3://131273-rawdata/wspr
Multiple files, well the way you would expect has an issue.
D:\python>aws s3 cp wsprspots-2009-??.csv s3://131273-rawdata/wspr
The user-provided path wsprspots-2009-??.csv does not exist.
The correct way is to try to copy all the files with a period marker and then specify exclude and deny rules….
D:\python>aws s3 cp . s3://131273-rawdata/wspr/ --recursive --exclude "*" --include "wsprspots-2009-*.csv" --dryrun
(dryrun) upload: .\wsprspots-2009-01.csv to s3://131273-rawdata/wspr/wsprspots-2009-01.csv
(dryrun) upload: .\wsprspots-2009-02.csv to s3://131273-rawdata/wspr/wsprspots-2009-02.csv
.
.
.
(dryrun) upload: .\wsprspots-2009-11.csv to s3://131273-rawdata/wspr/wsprspots-2009-11.csv
(dryrun) upload: .\wsprspots-2009-12.csv to s3://131273-rawdata/wspr/wsprspots-2009-12.csv
This copies all files from the . folder ie the current folder and recurses through all files. It then excludes everything, so nothing is copied, and then you permit by way of the final include some files to copy. The –dryrun marker is essential to test. For more information on the s3 cp wildcards I would look here.. https://docs.aws.amazon.com/cli/latest/reference/s3/#use-of-exclude-and-include-filters
Once we have the data uploaded, open the AWS console and navigate to Athena
We need to start up an Apache Spark session, which needs a bit of setup work. Click on the hamburger in top left and click on workgroups
Now on the right, click on the yellow Create Workbook and then be sure to select the Apache Spark engine
You should then have an Apache Spark enabled workbook.
Athena can read files in from a standard S3 bucket, but to put data back into a bucket needs a little more work as it doesnt have the needed permissions. To resolve this open up the workbook page and find the relevant Role ARN
Click on the link and you will end up in the IAM centre
To add the extra permissions in we need to click on the add permission button in Permissions policies.
Specify an Inline policy and then click the JSON tab
This will show the permissions.
Delete all of the JSON in the Policy Editor, and replace with the following
This will give a set of reasonably robust permissions that will let Athena write out to any of the S3 buckets you posses – if you want to limit buckets you need to specify specific ones in the Resource section.
Click on Next and then on the next screen add a name for the policy and then Create Policy
In Athena, navigate on the left to the notebook editor
and then on the right, create a new notebook by clicking the plus icon
Once a notebook is created it will have access to up to 20DPUs – these are data processing units that Spark will extend out over if needed to improve the performace. Doing so can be expensive – DPU’s cost per hour and are about $0.55 an hour as well as data scanning of $5 per terabye read. It is possible to limit scale out of DPU’s although a minimum of three must be available.
In the notebook you can add lines which are known as cells. Each cell is sticky – the results stay in there for reuse by subsequent commands.
Here is an example of a notebook in development, the commands are run indivudually to evaluate the output.
Spark uses a similar structure and way of processing as Pandas, although it is definitly not the same as it has to account for scale out across nodes.
Line 1 reads in from the S3 bucket. Conveniently if all the files are placed in the same folder, it will read all the CSV files sequentially and process them which is a useful hack
Line 2 selects the duplicates in a similar fashion to the Pandas example earlier
Line3 is a little bit of a hack. You cannot use a Spark dataframe inside another Spark dataframe in Spark, so we convert it to a Pandas frame. This can then be used in line 4 to select the final list.
Finally line 5 writes out the results to the S3 bucket – the coalesce(1) ensures that it writes one file rather than chunking into a series of smaller files.
Apart from the write out – the entire process takes less than 30 seconds to process an entire years data.
Conclusions
Spark for Athena is quick easy and simple to set up and use processing resource. It can easily process data in and out through S3 buckets.
Cost can be an issue – to process a TB of data is going to be $5 to scan it. The the query is run twice then it costs twice as much – this is in addition to the compute costs. It is essential to be sure the queries are going to work without problems.
Local development is therefore essential, which means that you have to have a small Apache Spark setup to develop. This is possible using Python libraries but it is an additional layer of complexity. Small jobs can be prototyped inside of Athena if a small sample dataset is available as data scanning seems to contribute to most of the cost of Athena.
The benefits are large though – you can make use of a hardware setup that would cost upwards of tens of thousands of dollars and make use of it for quite modest outlays. For irregular jobs, it can be quite cost efficient to consume Athena resources for the processing ability that it gives.