executors per instance = (total number of virtual cores per instance - 1)/ spark.executor.cores executor memory = spark.executor.memory + spark.executor.memoryOverhead (or yarn)
Dragos Balan
CPU / runtime
Memory issues
Disk and network
Disclaimer: they are all related
executors per instance = (total number of virtual cores per instance - 1)/ spark.executor.cores executor memory = spark.executor.memory + spark.executor.memoryOverhead (or yarn)
case class OpportunityRequestData(oppo : ForecastedOppoWithPoi)
extends RequestData {
val queryParamMapFromString = QueryParamMapBuilder.buildQueryParamMap(
oppo.forecastedOppo.getUrlQuery,
oppo.commaSeparatedPois
)
override def getOperatingSystem: String = ???
//other getters here ...
}
def matchOpportunityWithCampaign( oppoWithPoi: ForecastedOppoWithPoi,
campaign: Campaign): Iterable[(Zone, Banner)] =
for {
banner <- campaign.getBanners
zone <- matchZones(oppoWithPoi.forecastedOppo, banner)
if matchesLimitations(oppoWithPoi, banner)
} yield zone -> banner
}
def matchesLimitations(opportunity: ForecastedOppoWithPoi, banner: Banner) : Boolean = {
limitationEvaluator.evaluate(
banner.getLimitation,
OpportunityRequestData(opportunity)
)
}
val rdd = dataComputation()
val partitionedRdd = rdd.partitionBy(customPartitioner)
partitionedRdd.persist(StorageLevel.MEMORY_AND_DISK_SER)
val result1 = partitionedRdd.map(...).collect()
val result2 = partitionedRdd.flatMap(...).saveAsHadoop()
sparkConfig
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.set("spark.kryo.registrationRequired", "true")
.set("spark.kryo.registrator", classOf[ForecasterKryoRegistrator].getName)
Besides the default serializers provided by Kryo, there are other libraries to help you with that
collectAsync
unpersist(async = true)
into one single RDD.map operation if possible
java.lang.OutOfMemoryError : GC overhead limit exceeded
OpenJDK 64-Bit Server VM warning: INFO: os::commit_memory(0x0000000654a5e000, 837603328, 0) failed; error='Cannot allocate memory' (errno=12) There is insufficient memory for the Java Runtime Environment to continue. Native memory allocation (mmap) failed to map 837603328 bytes for committing reserved memory.
Container killed by YARN for exceeding memory limits 12.4 GB of 12 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.
TimeoutException
20/08/17 14:09:23 WARN DataStreamer: Exception for BP-2134224599-192.168.152.25-1597671528722:blk_1073741843_1019 java.io.EOFException: Unexpected EOF while trying to read response from server at org.apache.hadoop.hdfs.protocolPB.PBHelperClient.vintPrefixed(PBHelperClient.java:402) at org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck.readFields(PipelineAck.java:213) at org.apache.hadoop.hdfs.DataStreamer$ResponseProcessor.run(DataStreamer.java:1073)
in EMR env they are in stdout.
2020-09-29T07:16:17.239+0000: [GC (Allocation Failure) 2020-09-29T07:16:17.239+0000: [ParNew: 629120K->30791K(629120K), 0.2974821 secs] 633015K->98181K(2027264K), 0.2975878 secs] [Times: user=3.71 sys=0.06, real=0.30 secs] 2020-09-29T07:16:20.218+0000: [GC (Allocation Failure) 2020-09-29T07:16:20.218+0000: [ParNew: 590023K->69888K(629120K), 0.2883358 secs] 657413K->195919K(2027264K), 0.2884263 secs] [Times: user=0.71 sys=0.04, real=0.29 secs] 2020-09-29T07:16:21.062+0000: [GC (Allocation Failure) 2020-09-29T07:16:21.062+0000: [ParNew: 629120K->69888K(629120K), 1.2208962 secs] 755151K->592684K(2027264K), 1.2209997 secs] [Times: user=2.76 sys=0.39, real=1.22 secs] 2020-09-29T07:16:22.283+0000: [GC (CMS Initial Mark) [1 CMS-initial-mark: 522796K(1398144K)] 601001K(2027264K), 0.0134823 secs] [Times: user=0.02 sys=0.00, real=0.01 secs]
collect()
collectAsMap()
as soon as possible
sync if hot
map partitions
map ( hidden mapPartition )
primitive/Wrapper | primitive size | wrapper size |
boolean | 1 bit ? | 128 bits |
byte | 8 bits | 128 bits |
short, char | 16 bits | 128 bits |
int, float | 32 bits | 128 bits |
long, double | 64 bits | 192 bits |
if-allocation:
30.0000 campaigns x 4000 hours (aprox) * 192 bits (Double) = 21GB
30.0000 campaigns x 4000 hours (aprox) * 64 bits (double) = 7,15 GB
because java classes overhead
see FastUtil collections
prefer RDD.map / flatMap over Iterator.map / flatMap
avoid transfers from java to scala and viceversa
avoid Option
The best way to size the amount of memory consumption a dataset will require is to create an RDD, put it into cache, and look at the “Storage” page in the web UI.
To estimate the memory consumption of a particular object, use SizeEstimator’s estimate method.
does not work for lazy structures ( RDD, Iterators)
Sometimes, you will get an OutOfMemoryError not because your RDDs don’t fit in memory, but because the working set of one of your tasks, such as one of the reduce tasks in groupByKey, was too large.
Spark’s shuffle operations (sortByKey, groupByKey, reduceByKey, join, etc) build a hash table within each task to perform the grouping, which can often be large. The simplest fix here is to increase the level of parallelism, so that each task’s input set is smaller.
On an LP64 system, the heap used by a given program might have to be around 1.5 times larger than when it is run on an ILP32 system
compressed ordinary object pointers is supported and enabled by default in Java SE 6u23 and later. In Java SE 7, compressed oops is enabled by default for 64-bit JVM processes when -Xmx isn’t specified and for values of -Xmx less than 32 gigabytes.
consider increasing the driver / executor heap and their overhead
hardware configuration
sudo du -a / | sort -n -r | head -10
spark-submit App
--conf spark.shuffle.compress=true \
--conf spark.shuffle.spill.compress=true