Author Archives: Saeed Barghi

About Saeed Barghi

Senior Big Data and Cloud Consultant based in Melbourne, Australia

AWS Glue Part 3: Automate Data Onboarding for Your AWS Data Lake

When it comes to building data lakes in AWS s3, it makes even more sense to use Spark. Why? Because you can take advantage of Glue and build ETL jobs that generate and execute Spark for you, server-less. It means you won’t need to worry about building and maintaining EMR clusters, scale them up and down based on when what job runs. Glue takes care of all of it for you.

AWW Glue

In part one and part two of my posts on AWS Glue, we saw how to create crawlers to catalogue our data and then how to develop ETL jobs to transform them. Here we’ll see how we can use Glue to automate onboarding new datasets into data lakes.

On-board New Data Sources Using Glue

On-boarding new data sources could be automated using Terraform and AWS Glue. By onbaording I mean have them traversed and catalogued, convert data to the types that are more efficient when queried by engines like Athena, and create tables for transferred data.

Below is the list of what needs to be implemented. Note that Terraform doesn’t fully support AWS Glue yet, so some steps needs to be implemented manually. See here for more information.

   1- Create s3 folder structure using Terraform (resource “aws_s3_bucket_object”). There are 2 folder structures that needs to be created:

       a- The structure that matches the pattern at which data lands, for example: s3://my_bucket/raw_data/data_source_name/table_name/. You can create multiple folders here, one per table that you’re onboarding.

      b- The structure to store data after it is transferred: s3://my_bucket/processed_data/data_source_name/table_name/.

   2- Create a new database for the source being on-boarded using Terraform. You can create this database in Glue (Terraform resource “aws_glue_catalog_database”) or in Athena (resource “aws_athena_database”). I couldn’t see any difference when I tried both options.

3- Create a new Crawler using Terraform for the new data source (Terraform doesn’t support Glue Crawlers yet, do this step manually until this issue is closed). This is the crawler responsible for inferring data structure of what’s landing in s3 and catalogue and create tables in Athena.

a- Crawler should point to the database related to the source. In example above, it should point to s3://my_bucket/raw_data/data_source_name/

b- Crawler will create one table per subfolder where it’s pointing to in s3, in Athena database (which will be used as source in ETL jobs later). In other words, we’ll need multiple folders in source folder in s3, but only one crawler in Glue.

c- Prefix table name to specify the table type, in this case raw e.g. “raw_”.

Note that tables created by this crawler are only for storing metadata. They won’t be used by users or data engineers to query data, we’ll create another set of tables for that in step 5.

4- Create new Glue ETL job using Terraform

a- Specify schedule according to the frequency at which data lands in s3

      b- ETL job will read the data in raw folder, convert it to Parquet (or any other columnar format like ORC), and store in Processed folder

   5- Create a new Crawler using Terraform to catalogue transformed data (again, you need to do this manually for now)

a- Schedule should match that of the ETL job in step 4. This is to make sure data processed and transformed by ETL is available for queries as soon as possible.

      b- It will create a table in Athena in the database where source table is

      c- Prefix table’s name: “processed_”

 

By following steps above, we have a self-evolving data on-boarding process that we can take from one environment to another in a matter of minutes. A very obvious use case would be to move from non-prod to prod after each source/table is tested and verified, just by pointing our Terraform scripts to the new environment.

Hope this post helps, and please do not hesitate to give me feedback via comments

 

 

 

 

Advertisements

AWS Glue Part 2: ETL your data and query the result in Athena

In part one of my posts on AWS Glue, we saw how Crawlers could be used to traverse data in s3 and catalogue them in AWS Athena.

Glue is a serverless service that could be used to create ETL jobs, schedule and run them. In this post we’ll create an ETL job using Glue, execute the job and then see the final result in Athena. We’ll go through the details of the code generated in a later post.

For the purpose this tutorial I am going to use Glue to flatten the json returned by calling Jira API. It’s a long and complex json response, you can see how it looks like here. We had to do it recently at work and it took 2 analysts 2 days to understand the structure and list out all the fields. Using Glue, it’ll take 15 minutes!

Note that if your JSON file contains arrays and you want to be able to flatten the data in arrays, you can use jq to get rid of array and have all the data in JSON format. More about jq here.

Let’s get started:

1. Navigate to AWS Glue console and click on Jobs under ETL in the left hand pane

2. Click on Add job button to kick off Add job wizard

3. Fill up job properties. Most of them are self-explanatory:

a. Provide name.

b. A role that has full Glue access as well as access to the s3 buckets where this job is going to read data from and write results to, as well as save Spark script it generates.

c. Specify whether you’re going to to use Glue interface to develop the basics of your job, have it run an existing script that is already pushed to s3, or start writing the Spark code from scratch.

In this example we’ll select option 1, to have Glue generate the script for us. We get the option to edit it later, if need be.

d. Specify s3 buckets where your script to be saved for future use and where temporary data would be:

etl_job_properties

4. Select where your source data is. This section lists the tables in Athena databases that the Glue role has access to. We’ll use the table we created in part one:

etl choose source

5. Next step? You guessed it right, choosing the target for your ETL job. I want to store the result of my job as a new table, convert my JSON to Parquet (since its faster and less expensive for Athena to query data stored in columnar format) and specify where I want my result to be stored in s3:

etl choose target

6. Here’s the exciting part. Glue matches all the columns in the source table to columns in the target table it’s going to create for us. This is where we can see how our JSON file actually looks like and flatten it by taking columns we’re interested in out of their respected JSON structs:

a. Expand fields, issuetype and project:

etl map source to dest

b. Remove all the unwanted columns by clicking on the cross button next to them on Target side. W can add the ones that we want to have in our flattened output one by one, by clicking on Add column on top right and then map columns in source to the new ones we just created:

etl map source to dest 2

7. Click Finish

8. The next page you’ll see is Glue’s script editor. Here you can review the Spark script generated for you and either run it as it is or make changes to it. For now we’re going to run it as it is. Click on Run job button. You’ll be asked to provide job parameters, put in 10 for the number of concurrent PDUs and click on Run job:

etl run job

Wait for the job to finish and head to the location in s3 where you stored the result. You’ll see a new file created there for you:

etl result s3

Now that we have our data transformed and converted to Parquet, it’s time to make it available for SQL queries. If you went through my first post on Glue, you’d know the answer is to use Crawlers to create the table in Athena. Follow those steps, create a crawler and have your table available to be queried using SQL. I have done that and this is how my result looks like for what we did together in this document:

etl_athena

Easy, right? You don’t have to worry about provisioning servers, have the right software and version installed on them, and then compete with other applications to acquire resources. That is the power of serverless services offered by cloud providers. Which I personally find very useful, time and cost saving.

 


AWS Glue Part 1: Discover and Catalogue Data Stored in s3

AWS Glue

Glue is a fully managed extract, transform, and load (ETL) service offered by Amazon Web Services. Glue discovers your data (stored in S3 or other databases) and stores the associated metadata (e.g. table definition and schema) in the Glue Data Catalog. Once cataloged, your data is immediately searchable, queryable, and available for ETL.

Once your ETL job is ready, you can schedule it to run on Glue’s fully managed, scale-out Apache Spark environment. It provides a flexible scheduler with dependency resolution, job monitoring, and alerting.

Glue provides out-of-the-box integration with Amazon Athena, Amazon EMR, Amazon Redshift Spectrum, and any Apache Hive Metastore-compatible application.

Discover Data Using Crawlers

AWS Glue is able to traverse data stores using Crawlers and populate data catalogues with one or more metadata tables. These tables could be used by ETL jobs later as source or target.

Below are the steps to add a crawler to analyse and catalogue data in an s3 bucket:

1. Sign in to the AWS Management Console and open the AWS Glue console. Choose the         Crawlers tab.

2. Choose Add crawler, it’ll lunch the Add crawler wizard. Follow the Wizard:

a. Specify a name and description for your crawler.

b. Add a data store. Here you have options to specify an s3 bucket or a JDBC connection. After selecting s3, select option for “Specified path in my account” and select folder icon next to “Include path” to select where the data to be crawled is:

Crawler Add Data Source

c. You can add another data source, in case you want to join data from 2 different places together:

Crawler Add Another Datasource

d. Choose an IAM role that has permissions to work with Glue. This role should have full access to run Glue jobs as well as access to the s3 buckets it reads data from and stores script to:

Crawler Choose IAM Role

e. Create a schedule for your Crawler. You can have it run on demand or chose one of the options in drop-down:

Crawler Schedule

f. The next step is to chose the location where the output from your crawler will be stored. This is a database in Athena, and you can pre-fix the name of the tables created by your crawler to be distinguishable easily from other tables in the database:

Crawler Configure Output

g. Review your crawler’s settings and click on Finish. You’ll be redirected to the main Crawlers page, where your crawler is listed.

h. Click on “Run it now?”:

Crawlers Main 2

When crawler finished running, go to Athena console and check your table’s there:

Athena Source Table

Examine table’s DDL. It’s an external table pointing to the location in s3 where your Crawler “crawled”. And start writing queries on it. It’s the first table you created using Glue crawlers. First of many. 🙂

 


Airflow & Celery on Redis: when Airflow picks up old task instances

This is going to be a quick post on Airflow. We realized that in one of our environments, Airflow scheduler picks up old task instances that were already a success (whether marked as success or completed successfully). You can verify this is actually your issue by ssh into your Airflow workers, and run:

ps -ef | grep airflow

And check the DAG Run IDs: most of them are for old runs.

This happens when Celery’s Backend, in our case Redis, has old keys (or duplicate keys) of task runs. So the solution would be to clear Celery queue. And here are the steps to do it when Celery runs on Redis:

1- Stop Airflow Scheduler:

sudo initctl status airflow-scheduler

sudo initctl stop airflow-scheduler

2- Stop webserver:

sudo initctl status airflow-webserver

sudo initctl stop airflow-webserver

3- Stop Celery Flower:

cd /var/lib/airflow/bin

./airflow.sh flower status

./airflow.sh flower stop

4- Stop workers:

cd /var/lib/airflow/bin

./airflow.sh worker status

./airflow.sh worker stop

Now ssh into the server where Redis is running and type “redis-cli” and press enter to get into Redis CLI. Follow steps below to flush Redis DB:

  1. INFO keyspace — List keyspaces

    a. You should get only 1 result back

  2. SELECT 0 — Select Database
  3. config get dir —  Get database file location to take backup
  4. Copy file “xxxx.db” from above location to your home directory
  5. FLUSHDB — Flush database

Now you can start all Airflow services:

1- Scheduler commands

sudo initctl start airflow-scheduler

sudo initctl status airflow-scheduler

2- Webserver commands

sudo initctl start airflow-webserver

sudo initctl status airflow-webserver

3- Flower commands

cd /var/lib/airflow/prd/bin

nohup ./airflow.sh flower start &

./airflow.sh flower status

4- Worker commands

cd /var/lib/airflow/prd/bin

nohup ./airflow.sh worker start &

./airflow.sh worker status

 

Go back to Airflow and validate all DAGs are starting and completing successfully.

And he happy ever after! 🙂


How to import spark.implicits._ in Spark 2.2: error “value toDS is not a member of org.apache.spark.rdd.RDD”

I wrote about how to import implicits in spark 1.6 more than 2 years ago. But things have changed in Spark 2.2: the first thing you need to do when coding in Spark 2.2 is to set up an SparkSession object. SparkSession is the entry point to programming Spark with DataSet and DataFrame.

Like Spark 1.6, spark.implicits are required to be able to use Spark’s API for DataSets and DataFrames in version 2.2. And like version 1.6, an instance of SparkContext is needed in Spark 2.2 before being able to import spark.implicits. Since each instance of SparkSession comes with and an instance of SparkContext associated with it, all you have to do is to create an object of SparkSession and you’re set.

I have seen other posts that mention bits and pieces of how to do it. Here I give you the full code that works just fine and you can tweek it based on your requirements:

import org.apache.spark.sql._

import org.apache.log4j._

object sparkSQLWithCaseClass {

case class Person (ID: Int, name: String)

def mapper(l: String): Person = {

val fields = l.split(‘,’)

val person: Person = Person(fields(0).toInt, fields(1))

return person

}

 

def main(args: Array[String]){

Logger.getLogger(“org”).setLevel(Level.ERROR)

val spark = SparkSession.builder.appName(“Spark SQL”).getOrCreate()

 

val lines = spark.sparkContext.textFile(“../../people.csv”)

val people = lines.map(mapper)

 

import spark.implicits._

val schemaPeople = people.toDS()

schemaPeople.printSchema()

schemaPeople.createOrReplaceTempView(“people”)

 

val t = spark.sql(“select * from people where age >= 13”)

val res = t.collect()

res.foreach(println)

spark.stop()

}

}


Spark Error “java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE” in Spark 1.6

RDDs are the building blocks of Spark and what make it so powerful: they are stored in memory for fast processing. RDDs are broken down into partitions (blocks) of data, a logical piece of distributed dataset.

The underlying abstraction for blocks in Spark is a ByteBuffer, which limits the size of the block to 2 GB.

In brief, this error means that the block size for the resulting RDD is larger than 2GB: https://issues.apache.org/jira/browse/SPARK-1476

One way to work around this issue is to increase application’s parallelism. We can define the default number of partitions in RDDs returned by join and reduceByKey, by adjusting

spark.default.parallelism

What this configuration parameter does is basically to define how many blocks of data our dataset, in this case RDD, is going to be divided into.

As you have probably realized by now, we would need to set spark.default.parallelism to a higher value when processing large datasets. This way we can make sure the size of data blocks do not exceed 2GB limitations.


Spark Error CoarseGrainedExecutorBackend Driver disassociated! Shutting down: Spark Memory & memoryOverhead

Another common error we saw in yarn application logs was this:

17/08/31 15:58:07 WARN CoarseGrainedExecutorBackend: An unknown (datanode-022:43969) driver disconnected.

17/08/31 15:58:07 ERROR CoarseGrainedExecutorBackend: Driver 10.1.1.111:43969 disassociated! Shutting down.

Googling this error suggests increasing spark.yarn.driver.memoryOverhead or spark.yarn.executor.memoryOverhead or both. That has apparently worked for a lot of people. Or at least those who were smart enough to understand how these properties work.

What you need to consider here is that memoryOverhead is allocated out of the total amount of memory available to driver or executor, which is controlled by spark.driver.memory & spark.executor.memory.

What this means is that if you’re increasing executor’s or driver’s memoryOverhead, double check if there is enough memory allocated to driver and executor or not. In our case, the user was allocating all the memory available to driver as memoryOverhead, which meant there was none left for other other driver operations:

spark-submit \
–queue default \
–verbose \
–master yarn-cluster \
–conf spark.shuffle.service.enabled=true \
–conf spark.shuffle.manager=sort \
–conf spark.executor.memory=8g \
–conf spark.dynamicAllocation.enabled=true \
–conf spark.dynamicAllocation.minExecutors=10 \
–conf spark.executor.cores=2 \
–conf spark.driver.memory=8g \
–conf spark.network.timeout=600s \
–conf spark.scheduler.executorTaskBlacklistTime=3600000 \
–conf spark.yarn.driver.memoryOverhead=8192 \
–conf spark.yarn.executor.memoryOverhead=8192 \

You can clearly see what I meant in above paragraph. Instead of doing this, user should have increased executor and driver memory according to increase in executor memory overhead:

spark-submit \
–queue default \
–verbose \
–master yarn-cluster \
–conf spark.shuffle.service.enabled=true \
–conf spark.shuffle.manager=sort \
–conf spark.executor.memory=16g \
–conf spark.dynamicAllocation.enabled=true \
–conf spark.dynamicAllocation.minExecutors=10 \
–conf spark.executor.cores=2 \
–conf spark.driver.memory=16g \
–conf spark.network.timeout=600s \
–conf spark.scheduler.executorTaskBlacklistTime=3600000 \
–conf spark.yarn.driver.memoryOverhead=8192 \
–conf spark.yarn.executor.memoryOverhead=8192 \