spark simpleCosting 源码
spark simpleCosting 代码
文件路径:/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/simpleCosting.scala
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.adaptive
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.exchange.ShuffleExchangeLike
import org.apache.spark.sql.execution.joins.ShuffledJoin
/**
* A simple implementation of [[Cost]], which takes a number of [[Long]] as the cost value.
*/
case class SimpleCost(value: Long) extends Cost {
override def compare(that: Cost): Int = that match {
case SimpleCost(thatValue) =>
if (value < thatValue) -1 else if (value > thatValue) 1 else 0
case _ =>
throw QueryExecutionErrors.cannotCompareCostWithTargetCostError(that.toString)
}
}
/**
* A skew join aware implementation of [[CostEvaluator]], which counts the number of
* [[ShuffleExchangeLike]] nodes and skew join nodes in the plan.
*/
case class SimpleCostEvaluator(forceOptimizeSkewedJoin: Boolean) extends CostEvaluator {
override def evaluateCost(plan: SparkPlan): Cost = {
val numShuffles = plan.collect {
case s: ShuffleExchangeLike => s
}.size
if (forceOptimizeSkewedJoin) {
val numSkewJoins = plan.collect {
case j: ShuffledJoin if j.isSkewJoin => j
}.size
// We put `-numSkewJoins` in the first 32 bits of the long value, so that it's compared first
// when comparing the cost, and larger `numSkewJoins` means lower cost.
SimpleCost(-numSkewJoins.toLong << 32 | numShuffles)
} else {
SimpleCost(numShuffles)
}
}
}
相关信息
相关文章
spark AQEPropagateEmptyRelation 源码
spark AdaptiveSparkPlanExec 源码
spark AdaptiveSparkPlanHelper 源码
0
赞
- 所属分类: 前端技术
- 本文标签:
热门推荐
-
2、 - 优质文章
-
3、 gate.io
-
7、 golang
-
9、 openharmony
-
10、 Vue中input框自动聚焦