spark AQEUtils 源码
spark AQEUtils 代码
文件路径:/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEUtils.scala
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.adaptive
import org.apache.spark.sql.catalyst.plans.physical.{ClusteredDistribution, Distribution, HashPartitioning, UnspecifiedDistribution}
import org.apache.spark.sql.execution.{CollectMetricsExec, DeserializeToObjectExec, FilterExec, ProjectExec, SortExec, SparkPlan}
import org.apache.spark.sql.execution.exchange.{REPARTITION_BY_COL, REPARTITION_BY_NUM, ShuffleExchangeExec}
object AQEUtils {
// Analyze the given plan and calculate the required distribution of this plan w.r.t. the
// user-specified repartition.
def getRequiredDistribution(p: SparkPlan): Option[Distribution] = p match {
// User-specified repartition is only effective when it's the root node, or under
// Project/Filter/LocalSort/CollectMetrics.
// Note: we only care about `HashPartitioning` as `EnsureRequirements` can only optimize out
// user-specified repartition with `HashPartitioning`.
case ShuffleExchangeExec(h: HashPartitioning, _, shuffleOrigin)
if shuffleOrigin == REPARTITION_BY_COL || shuffleOrigin == REPARTITION_BY_NUM =>
val numPartitions = if (shuffleOrigin == REPARTITION_BY_NUM) {
Some(h.numPartitions)
} else {
None
}
Some(ClusteredDistribution(h.expressions, requiredNumPartitions = numPartitions))
case f: FilterExec => getRequiredDistribution(f.child)
case s: SortExec if !s.global => getRequiredDistribution(s.child)
case c: CollectMetricsExec => getRequiredDistribution(c.child)
case d: DeserializeToObjectExec => getRequiredDistribution(d.child)
case p: ProjectExec =>
getRequiredDistribution(p.child).flatMap {
case h: ClusteredDistribution =>
if (h.clustering.forall(e => p.projectList.exists(_.semanticEquals(e)))) {
Some(h)
} else {
// It's possible that the user-specified repartition is effective but the output
// partitioning is not retained, e.g. `df.repartition(a, b).select(c)`. We can't
// handle this case with required distribution. Here we return None and later on
// `EnsureRequirements` will skip optimizing out the user-specified repartition.
None
}
case other => Some(other)
}
case _ => Some(UnspecifiedDistribution)
}
}
相关信息
相关文章
spark AQEPropagateEmptyRelation 源码
spark AdaptiveSparkPlanExec 源码
spark AdaptiveSparkPlanHelper 源码
spark AdjustShuffleExchangePosition 源码
0
赞
- 所属分类: 前端技术
- 本文标签:
热门推荐
-
2、 - 优质文章
-
3、 gate.io
-
7、 golang
-
9、 openharmony
-
10、 Vue中input框自动聚焦