Category Archives: Business Intelligence

How to import spark.implicits._ in Spark 2.2: error “value toDS is not a member of org.apache.spark.rdd.RDD”

I wrote about how to import implicits in spark 1.6 more than 2 years ago. But things have changed in Spark 2.2: the first thing you need to do when coding in Spark 2.2 is to set up an SparkSession object. SparkSession is the entry point to programming Spark with DataSet and DataFrame.

Like Spark 1.6, spark.implicits are required to be able to use Spark’s API for DataSets and DataFrames in version 2.2. And like version 1.6, an instance of SparkContext is needed in Spark 2.2 before being able to import spark.implicits. Since each instance of SparkSession comes with and an instance of SparkContext associated with it, all you have to do is to create an object of SparkSession and you’re set.

I have seen other posts that mention bits and pieces of how to do it. Here I give you the full code that works just fine and you can tweek it based on your requirements:

import org.apache.spark.sql._

import org.apache.log4j._

object sparkSQLWithCaseClass {

case class Person (ID: Int, name: String)

def mapper(l: String): Person = {

val fields = l.split(‘,’)

val person: Person = Person(fields(0).toInt, fields(1))

return person

}

 

def main(args: Array[String]){

Logger.getLogger(“org”).setLevel(Level.ERROR)

val spark = SparkSession.builder.appName(“Spark SQL”).getOrCreate()

 

val lines = spark.sparkContext.textFile(“../../people.csv”)

val people = lines.map(mapper)

 

import spark.implicits._

val schemaPeople = people.toDS()

schemaPeople.printSchema()

schemaPeople.createOrReplaceTempView(“people”)

 

val t = spark.sql(“select * from people where age >= 13”)

val res = t.collect()

res.foreach(println)

spark.stop()

}

}

Advertisements

Spark Error “java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE” in Spark 1.6

RDDs are the building blocks of Spark and what make it so powerful: they are stored in memory for fast processing. RDDs are broken down into partitions (blocks) of data, a logical piece of distributed dataset.

The underlying abstraction for blocks in Spark is a ByteBuffer, which limits the size of the block to 2 GB.

In brief, this error means that the block size for the resulting RDD is larger than 2GB: https://issues.apache.org/jira/browse/SPARK-1476

One way to work around this issue increase application’s parallelism. We can define the default number of partitions in RDDs returned by join and reduceByKey, by adjusting

spark.default.parallelism

What this configuration parameter does is basically to define how many blocks of data our dataset, in this case RDD, is going to be divided into.

As you have probably realized by now, we would need to set spark.default.parallelism to a higher value when processing large datasets. This way we can make sure the size of data blocks do not exceed 2GB limitations.


Spark Error CoarseGrainedExecutorBackend Driver disassociated! Shutting down: Spark Memory & memoryOverhead

Another common error we saw in yarn application logs was this:

17/08/31 15:58:07 WARN CoarseGrainedExecutorBackend: An unknown (datanode-022:43969) driver disconnected.

17/08/31 15:58:07 ERROR CoarseGrainedExecutorBackend: Driver 10.1.1.111:43969 disassociated! Shutting down.

Googling this error suggests increasing spark.yarn.driver.memoryOverhead or spark.yarn.executor.memoryOverhead or both. That has apparently worked for a lot of people. Or at least those who were smart enough to understand how these properties work.

What you need to consider here is that memoryOverhead is allocated out of the total amount of memory available to driver or executor, which is controlled by spark.driver.memory & spark.executor.memory.

What this means is that if you’re increasing executor’s or driver’s memoryOverhead, double check if there is enough memory allocated to driver and executor or not. In our case, the user was allocating all the memory available to driver as memoryOverhead, which meant there was none left for other other driver operations:

spark-submit \
–queue default \
–verbose \
–master yarn-cluster \
–conf spark.shuffle.service.enabled=true \
–conf spark.shuffle.manager=sort \
–conf spark.executor.memory=8g \
–conf spark.dynamicAllocation.enabled=true \
–conf spark.dynamicAllocation.minExecutors=10 \
–conf spark.executor.cores=2 \
–conf spark.driver.memory=8g \
–conf spark.network.timeout=600s \
–conf spark.scheduler.executorTaskBlacklistTime=3600000 \
–conf spark.yarn.driver.memoryOverhead=8192 \
–conf spark.yarn.executor.memoryOverhead=8192 \

You can clearly see what I meant in above paragraph. Instead of doing this, user should have increased executor and driver memory according to increase in executor memory overhead:

spark-submit \
–queue default \
–verbose \
–master yarn-cluster \
–conf spark.shuffle.service.enabled=true \
–conf spark.shuffle.manager=sort \
–conf spark.executor.memory=16g \
–conf spark.dynamicAllocation.enabled=true \
–conf spark.dynamicAllocation.minExecutors=10 \
–conf spark.executor.cores=2 \
–conf spark.driver.memory=16g \
–conf spark.network.timeout=600s \
–conf spark.scheduler.executorTaskBlacklistTime=3600000 \
–conf spark.yarn.driver.memoryOverhead=8192 \
–conf spark.yarn.executor.memoryOverhead=8192 \

 


Spark Error: Failed to Send RPC to Datanode

This past week we had quite few issues with users not being able to run Spark jobs running in YARN Cluster mode. Particularly a team that was on tight schedule used to get errors like this all the time:

java.io.IOException: Failed to send RPC 8277242275361198650 to datanode-055: java.nio.channels.ClosedChannelException

Mostly accompanied by error messages like:

org.apache.spark.SparkException: Error sending message [message = Heartbeat(9,[Lscala.Tuple2;@e47ba81,BlockManagerId(9, datanode-50 , 43381))]

ERROR Executor: Exit as unable to send heartbeats to driver more than 60 times

These errors basically mean the connection between Spark driver and executors are broken, mainly because executor is killed. This could happen because of a number of reasons:

  1. We realized this happens a lot more often when our cluster is too busy and has hit maximum usage. What it means is that executors are accepted to DataNodes, but they fail to acquire enough memory on the datanode and therefore get killed.
  2. Metaspace attempts to grow beyond the executor(JVM) memory limits, resulting in loss of executors.The best way to stop this error from appearing is to set below properties when launching Spark-Shell or submitting application using spark-submit:spark.driver.extraJavaOptions = -XX:ReservedCodeCacheSize=100M

    -XX:MaxMetaspaceSize=256m

    -XX:CompressedClassSpaceSize=256m

    spark.executor.extraJavaOptions = -XX:ReservedCodeCacheSize=100M

    -XX:MaxMetaspaceSize=256m

    -XX:CompressedClassSpaceSize=256m

    Please note that depending on your project and code, you may need to increase the values mentioned above.

  3. Network is slow for whatever reason. In our case, this was caused by a change in DNS which resulted in turning off caching.This case could be fixed by adjusting spark.executor.heartbeatInterval and spark.network.timeout. Default values for these 2 parameters are 10s and 120s. You can adjust these 2 values based on how your network, the only point to consider here is that the later property, spark.network.timeout, should be greater than the first one.

If none of what mentioned above helps your situation, then it is something you need to take to your cluster’s administrator. There could be something wrong with the datanodes where executors are sent to that admins are not aware of.

Happy coding!


YARN Capacity Scheduler: Queue Priority

Capacity Scheduler is designed to run Hadoop jobs in a shared, multi-tenant cluster in a friendly manner. Its main strength is that it guarantees specific capacity for a certain group of users by supporting multiple queues and allowing users to submit their queries into their dedicated queues. Each queue is given a fraction of total cluster capacity (RAM and CPU) and all jobs submitted to a queue will have access to the capacity dedicated to that queue.

Queue priority in Capacity Scheduler is implemented by assigning higher/lower capacity to the queues which should have higher/lower priority. Another way of making sure of this arrangement is by setting the maximum percentage of cluster resources each queue can use. Therefore to assign lower priority to a queue we should limit the amount of resource it can use.

Doing so on default queue is a bit tricky, as all the jobs submitted to the platform go through the default queue and get their Application Master Container created in there. It is a very small container that controls application execution and requests resources for YARN job submiited to the cluster. Having said that, we can use another setting in our platform that allows assigning higher priority to more important applications by setting mapred.capacity-scheduler.queue.<queue-name>.supports-priority.

To see the settings of each queue in the cluster, you should navigate to resource manager’s web UI and click on Scheduler from left menu. Then click the arrow on the left hand side of each queue to expand the settings. The 2 most important settings to check are Absolute Capacity (Queue capacity in percentage) and Absolute Max Capacity (Maximum queue capacity in percentage (%) as a float. This limits the elasticity for applications in the queue):

C Scheduler


Hive Performance Tuning

If you have been working in Big Data, you have definitely heard of Hive. Apache Hive is the data warehouse infrastructure build on top of Hadoop. I did a presentation on how to best use Apache Hive and few tips on how to best use it for one of our clients last week that I would like to share with you here. This is designed to help developers and analysts writing better queries and get result faster from Hive.

To best understand how Hive works, we need to picture it as a file system. Data in each table is divided into partitions based on partitioning strategy and each partition is stored as a physical file in hdfs. Files are replicated as many times as hdfs’s replication factor dictates, which is usually 3.

Hive-01

Use Hive Partitions

As you have probably guessed, partitioning can be used to limit the files our query scans to complete its job, instead of having to go through each and every file. Using partitions is more efficient than creating and then using indexes as well, as it physically limits the set of files each query scans.

The question for a developer or analyst would then be “How can I know what partitions exist for a table?” SHOW PARTITIONS are the keywords to achieve this. It can be used in Hue, Aginity, Hive CLI, or any other way you would use to query Hive tables. Partitions could be used in a query’s where clause, the same way we filter the query on any other column. you can see that as well as the difference using a partition in a query makes below:

Hive_PT_02

Check Query Plan

Like any other database, hive provides a way to see the query plan it’ll use to execute the query. A query plan is the set of steps and commands the DB engine takes to execute the query and produce the result. You can use EXPLAIN  in the beginning of your query to see the query plan, or use Explain button in Hue to do the same:

Hive_PT_03

I used the query we looked at previously to explain a couple of points on what to look for when checking query plans:

  1. TableScan: As its name implies, this step reads (scans) the tables for a purpose. In this example, we can clearly see that our filter on src_date is applied to the table scan. And since this column is our partition column, Hive knows clearly which files it should open and read to get the result it is looking for
  2. Check the step at which the filter on partition column is being applied. The sooner this filter is applied, the less records will be passed to the downstream steps of query execution and the more efficient it is.

Hive_PT_04

Hive and Joins

When joining 2 tables, apply as much filter as possible on the bigger table in join itself, instead of where clause. This limits the number of records being joined to the smaller table and therefore, less records to work on or filter later.

 

Using all these techniques will help getting faster results from Hive, but nothing is more important than writing a good query. I was contacted by one of the analysts working for my client a few weeks ago, complaining that his query takes more than 12 hours to run. At the very first glance I realized he is scanning one of the biggest tables we have 5 times, with no filter on partitioned column. After spending a good hour on it and re-writing the query, we managed to get the same job done in about 25 minutes.

So, please write good queries. And always filter on partition column. Good luck!


OBIEE RPD Design: Convert Snowflake to Star schema from multiple sources in (Combine dimensions)

As I play more with OBIEE, I learn more about what it is capable of and where its main power resides. OBIEE has 3 layers: Physical, Business Model and Mapping, and Presentation. The middle layer, BMM, is what makes OBIEE special: it is where we can define how data from different sources and tables come together and form a nice and clean star schema.

Like most of my posts, I am gonna explain how this is achievable by demonstrating a demo. I defined 2 databases in my local instance of SQL Server. One with 1 fact table and 3 dimensions and another one with a single dimension that will be referenced by one of the dimensions in the first database: DimCustomer, DimLocation, DimProduct and FactTest in database Test and DimProductCategory in database Test2.

Screen Shot 2015-02-10 at 9.56.51 am

Let’s go through the steps to create and design the RPD for our data source. Launch Oracle BI Administrator:

a) Create first database connection:

Click on File and then New Repository. This will launch Create New Repository wizard. As you know, we can define only one data source using Create Repository wizard. I used this wizard to create my first database connection to Test database. I didn’t list steps here because it is very easy and straightforward.

b) Create second database connection:

1- Right-click on Physical pane of Oracle BI Administration Tool and select “New Database..”

2-Provide a name in General tab. Click on “Connection Pools” tab and then click on the green “+” sign.

3- Give the new Connection Pool a name, select the data source pointing to the second database from “Data Source Name” drop-down list and enter appropriate “User Name” and “Password”, as shown below. You will be prompted to re-enter the password after clicking Ok.

Screen Shot 2015-02-10 at 10.38.51 am

4- Click Ok again in New database window.

5- Now that we have added a new database and connection pool to our Physical layer, it’s time to add tables to the new connection. Right click on the entry for the new connection pool ( in my case, Connection Pool 2) and select “Import Metadata”. This will launch Import Metadata wizard, where you can select objects that should be added to the Physical layer.

6- Select the type of objects you want to import and click Next. In my tutorial, I select DimProductCategory from database Test2 and add it to Repository View, and then click Finish:

Screen Shot 2015-02-10 at 11.21.11 am

Your Physical layer pane should have 2 entries for 2 database connections and tables imported on each connection. My Physical Layer looks like this:

Screen Shot 2015-02-10 at 11.33.17 am

C) Physical Diagram

Now it is the time to define how tables are joined together in Physical layer.

1- Select tables from both connections, right-click and select “Physical Diagram” and then “Selected Object(s) Only”.

I didn’t define any foreign keys in my database, therefore I’ll get none of the tables related together in my initial physical diagram and I have to define how tables are joined manually.

2- To define the first join, click on New Join button, click on the fact table and then on DimProduct. This will create a join between those 2 tables based on the columns with the same name. You can change this in the next window that opens, “Physical Foreign Key”:

Screen Shot 2015-02-10 at 11.56.36 am

3- Do the same for the rest of the joins, including DimProduct and DimProductCategory. Note that there is no difference between joining the tables on the same database and tables that are on different databases. My diagram looks like this after setting up the join between all tables:

Screen Shot 2015-02-10 at 12.21.40 pm

d) Business Model and Mapping

After finishing with physical layer, it is time to define the business logic in BMM layer. In this section, we are gonna create a new Business Model, add objects from Physical layer to it, and define our Star schema on top of snow-flake diagram we created in physical layer by combining DimCustomer & DimLocation into one dimension, and DimProduct & DimProductCategory into another.

1- Right-click in Business Model and Mapping pane and select “New Business Model..”. Give your model a name and un-check Disabled checkbox.

2- Drag and Drop your fact table (FactTest) into the new BMM model:

Screen Shot 2015-02-10 at 1.28.36 pm

3- Now it’s time to combine our first tables together and create a single dimension. Let’s start with DimProduct and DimProductCategory, the 2 tables that reside on separate databases. We will use “Logical Table” to do this. Right-click on Business Model, then “New Object” and then select “Logical Table…”. Logical Table window will open.

4- Give your new logical table a name in “General”. Let’s call it DimMasterProduct.

5- Click on “Sources” tab. As its name implies, this is where we can define the source(s) for our new logical table from tables available in Physical layer. Click on the green “+” button. Logical Table Source window opens.

6- Give it a name.

7- The next step is very important: click on green “+” button below Name textbox, to map it to a physical table. A new window open which lists all the tables used in our Physical diagram:

Screen Shot 2015-02-10 at 1.44.56 pm

Click on DimProduct and then click “Select” Button. You’ll see Logical Table Source window again, with DimProduct added as the source:

Screen Shot 2015-02-10 at 1.47.25 pm

8- Click on the “+” button again to add another table. You’ll see that instead of all tables in Physical layer, only those that are joined to DimProduct in Physical Diagram are listed here. Select DimProductCategory. You’ll see a new Join is defined and you can change it from inner join to right or left join:

Screen Shot 2015-02-10 at 1.51.04 pm

9- Click on “Column Mapping” tab. Here we can add the columns from our sources to our logical table. Click on “Add New Column” button. Logical column window opens.

10- Give your new column a name (Product_ID). Click on “Column Source” tab. Select (Click) the logical table source you created in previous step and click OK:

Screen Shot 2015-02-10 at 2.04.52 pm

A new Logical Column is added to the “Column Mapping” tab of “Logical Table Source”. Select the right column from Expression drop-down list:

Screen Shot 2015-02-10 at 2.09.01 pm

11- Repeat the steps to map the rest of columns for the logical table. I added 4 columns, as shown below:

Screen Shot 2015-02-10 at 2.11.17 pm

Click OK to go back to Logical Table window. Click OK to exit Logical Table window.

e) Business Model Diagram

Notice that both FactTest and DimMasterProduct have a # mark on them, implying they are Fact tables. OBIEE will treat all tables with no join pointing to them in BMM layer as fact tables. To change this, we need to define our “Business Model Diagram”.

1- Right-click on your Business Model and select “Business Model Diagram”, and then “Whole Diagram”. My 2 tables in BMM layer namely FactTest and DimMasterProduct are shown.

2- Like what we did for Physical Layer, join the 2 tables together by clicking on “New Join” button, and then FactTest and DimMasterProduct. Logical Join window opens:

Screen Shot 2015-02-10 at 2.29.55 pm

Notices the difference? Here, unlike Physical layer, we don’t need to define which columns will be used to join 2 tables together: OBIEE will work it out based the physical relationship of tables in physical layer.

My diagram looks like this now (Note there is no # on DimMasterProduct anymore):

Screen Shot 2015-02-10 at 2.48.01 pm

f) Presentation Layer

As you know most probably, you can add your BMM model into Presentation layer just by dragging it and dropping into Presentation layer:

Screen Shot 2015-02-10 at 2.54.09 pm

Now, save your solution and when asked if you want OBIEE to check Global Consistency, click Yes. You’ll notice global consistency failed with an error on the logical table not having a primary key defined for it:

Screen Shot 2015-02-10 at 2.56.50 pm

This error can be fixed very easily:

1- Double-click on your logical table’s entry in BMM layer.

2- Go to “Keys” tab. There should be an empty row added.

3- Enter a name for the Key in “Key Name” column.

4- Select the column that is the primary key of your table from “Columns” drop-down.

5- Select the key you just created from “Primary Key” drop-down.

Screen Shot 2015-02-10 at 3.00.38 pm

6- Click Ok and save your RPD. Click yes for OBIEE to check Global Consistency.

7- Notice the change in the icon of Business Model in BMM layer.

Now your RPD is ready to be deployed and used for analysis. This method applies to any other scenario where 2 or more tables need to be joined together to create a single dimension: In other words, to convert snow-flake to star schema.

Hope I didn’t confuse you with such a long post,

Cheers.