Troubleshooting spark issues

Dragos Balan

Structure of this presentation

  • CPU / runtime

  • Memory issues

  • Disk and network

Disclaimer: they are all related

Obvious

right tool for the right job

executor / driver configuration (cores, memory)

executors per instance = (total number of virtual cores per instance - 1)/ spark.executor.cores

executor memory = spark.executor.memory + spark.executor.memoryOverhead (or yarn)

nbr. partitions / core

CPU / runtime performance issues

Symptoms: slow processing time

Symptoms: just a fraction of your processing power is used

Solutions

check CPU hotspots

CPU Hotspots

unneeded processing

case class OpportunityRequestData(oppo : ForecastedOppoWithPoi)
extends RequestData {

  val queryParamMapFromString = QueryParamMapBuilder.buildQueryParamMap(
    oppo.forecastedOppo.getUrlQuery,
    oppo.commaSeparatedPois
  )

  override def getOperatingSystem: String = ???

  //other getters here ...
}

unneeded processing

def matchOpportunityWithCampaign( oppoWithPoi: ForecastedOppoWithPoi,
								  campaign: Campaign): Iterable[(Zone, Banner)] =
    for {
        banner <- campaign.getBanners
        zone <- matchZones(oppoWithPoi.forecastedOppo, banner)
        if matchesLimitations(oppoWithPoi, banner)
    } yield zone -> banner
}

unneeded processing

def matchesLimitations(opportunity: ForecastedOppoWithPoi, banner: Banner) : Boolean = {
    limitationEvaluator.evaluate(
        banner.getLimitation,
        OpportunityRequestData(opportunity)
    )
}

persist when re-using RDDs

val rdd = dataComputation()
val partitionedRdd = rdd.partitionBy(customPartitioner)
partitionedRdd.persist(StorageLevel.MEMORY_AND_DISK_SER)
val result1 = partitionedRdd.map(...).collect()
val result2 = partitionedRdd.flatMap(...).saveAsHadoop()

Kryo serialization

kryo cpu performance
Figure 1. java serializers speed comparison

Kryo serialization

kryo space
Figure 2. java serializers memory consumption comparison

more details here

Kryo serialization

how to set up kryo in spark
sparkConfig
  .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
  .set("spark.kryo.registrationRequired", "true")
  .set("spark.kryo.registrator", classOf[ForecasterKryoRegistrator].getName)

Besides the default serializers provided by Kryo, there are other libraries to help you with that

avoid unnecessary shuffling

groupByKey

avoid unnecessary shuffling

balanced partitions

cluster cpu wrong distribution of tasks

prefer Async collection operation

  • collectAsync

  • unpersist(async = true)

combine multiple RDD.maps/flatMaps/filters

into one single RDD.map operation if possible

logging is heavy

Memory issues

Explicit Symptoms 1

java.lang.OutOfMemoryError : GC overhead limit exceeded

Explicit Symptoms 2

OpenJDK 64-Bit Server VM warning:
INFO: os::commit_memory(0x0000000654a5e000, 837603328, 0) failed; error='Cannot allocate memory' (errno=12)
There is insufficient memory for the Java Runtime Environment to continue.
Native memory allocation (mmap) failed to map 837603328 bytes for committing reserved memory.

Explicit Symptoms 3

Container killed by YARN for exceeding memory limits 12.4 GB of 12 GB physical memory used.
Consider boosting spark.yarn.executor.memoryOverhead.

Hidden Out Of Memory Symptoms :

  TimeoutException

Hidden Out Of Memory Symptoms 2

20/08/17 14:09:23 WARN DataStreamer:
Exception for BP-2134224599-192.168.152.25-1597671528722:blk_1073741843_1019
java.io.EOFException: Unexpected EOF while trying to read response from server
    at org.apache.hadoop.hdfs.protocolPB.PBHelperClient.vintPrefixed(PBHelperClient.java:402)
    at org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck.readFields(PipelineAck.java:213)
    at org.apache.hadoop.hdfs.DataStreamer$ResponseProcessor.run(DataStreamer.java:1073)

Solutions

check your GC logs

in EMR env they are in stdout.

2020-09-29T07:16:17.239+0000: [GC (Allocation Failure) 2020-09-29T07:16:17.239+0000: [ParNew: 629120K->30791K(629120K), 0.2974821 secs] 633015K->98181K(2027264K), 0.2975878 secs] [Times: user=3.71 sys=0.06, real=0.30 secs]
2020-09-29T07:16:20.218+0000: [GC (Allocation Failure) 2020-09-29T07:16:20.218+0000: [ParNew: 590023K->69888K(629120K), 0.2883358 secs] 657413K->195919K(2027264K), 0.2884263 secs] [Times: user=0.71 sys=0.04, real=0.29 secs]
2020-09-29T07:16:21.062+0000: [GC (Allocation Failure) 2020-09-29T07:16:21.062+0000: [ParNew: 629120K->69888K(629120K), 1.2208962 secs] 755151K->592684K(2027264K), 1.2209997 secs] [Times: user=2.76 sys=0.39, real=1.22 secs]
2020-09-29T07:16:22.283+0000: [GC (CMS Initial Mark) [1 CMS-initial-mark: 522796K(1398144K)] 601001K(2027264K), 0.0134823 secs] [Times: user=0.02 sys=0.00, real=0.01 secs]

If needed you can use GC Easy

too much data on driver

  • collect()

  • collectAsMap()

unpersist broadcasted variables

unpersist broadcasted variables

  • as soon as possible

  • sync if hot

prefer Iterator based API

  • map partitions

  • map ( hidden mapPartition )

avoid wrapper classes

primitive/Wrapper

primitive size

wrapper size

boolean

1 bit ?

128 bits

byte

8 bits

128 bits

short, char

16 bits

128 bits

int, float

32 bits

128 bits

long, double

64 bits

192 bits

Example:

if-allocation:

30.0000 campaigns x 4000 hours (aprox) * 192 bits (Double) = 21GB

30.0000 campaigns x 4000 hours (aprox) * 64 bits (double) = 7,15 GB

use primitive based collections instead of classic java collections

see FastUtil collections

if desperate :

  • prefer RDD.map / flatMap over Iterator.map / flatMap

  • avoid transfers from java to scala and viceversa

  • avoid Option

detect memory size of your structures

The best way to size the amount of memory consumption a dataset will require is to create an RDD, put it into cache, and look at the “Storage” page in the web UI.

detect memory size of your structures

rdd memory size

detect memory size of your structures

partition memory size

detect memory size of your structures

To estimate the memory consumption of a particular object, use SizeEstimator’s estimate method.

  • does not work for lazy structures ( RDD, Iterators)

heavy-on-memory documented rdd operations

  • groupByKey, reduceByKey, combine, aggregate

  • co-group, join

[1] [2]

memory-heavy rdd operations

Sometimes, you will get an OutOfMemoryError not because your RDDs don’t fit in memory, but because the working set of one of your tasks, such as one of the reduce tasks in groupByKey, was too large.

memory-heavy rdd operations

Spark’s shuffle operations (sortByKey, groupByKey, reduceByKey, join, etc) build a hash table within each task to perform the grouping, which can often be large. The simplest fix here is to increase the level of parallelism, so that each task’s input set is smaller.

executor.memory, spark.memory.fraction, spark.storage.fraction

spark memory fractions

heap < 32 GB

On an LP64 system, the heap used by a given program might have to be around 1.5 times larger than when it is run on an ILP32 system

heap < 32 GB

compressed ordinary object pointers is supported and enabled by default in Java SE 6u23 and later. In Java SE 7, compressed oops is enabled by default for 64-bit JVM processes when -Xmx isn’t specified and for values of -Xmx less than 32 gigabytes.

the path to GC root

who is holding a reference to your data

obvious options:

  • consider increasing the driver / executor heap and their overhead

  • hardware configuration

Disk / Networking issues

Symptoms : disk full errors

du is magic

sudo du -a / | sort -n -r | head -10

du is magic

spark application storage

check the size of your persisted data (spark ui)

persisted data

check the size of your persisted data

rdd persisted size per partition

control and gzip your shuffles if needed

 spark-submit App
  --conf spark.shuffle.compress=true \
  --conf spark.shuffle.spill.compress=true

kryo

control the size of hdfs blocks

logging is heavy