spark RocksDBFileManager 源码
spark RocksDBFileManager 代码
文件路径:/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.streaming.state
import java.io.{File, FileInputStream, InputStream}
import java.nio.charset.StandardCharsets.UTF_8
import java.nio.file.Files
import java.util.UUID
import java.util.concurrent.ConcurrentHashMap
import java.util.zip.{ZipEntry, ZipOutputStream}
import scala.collection.JavaConverters._
import scala.collection.mutable
import com.fasterxml.jackson.annotation.JsonInclude.Include
import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
import com.fasterxml.jackson.databind.annotation.JsonDeserialize
import com.fasterxml.jackson.module.scala.{ClassTagExtensions, DefaultScalaModule}
import org.apache.commons.io.{FilenameUtils, IOUtils}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{Path, PathFilter}
import org.json4s.NoTypeHints
import org.json4s.jackson.Serialization
import org.apache.spark.internal.Logging
import org.apache.spark.sql.execution.streaming.CheckpointFileManager
import org.apache.spark.util.Utils
/**
* Class responsible for syncing RocksDB checkpoint files from local disk to DFS.
* For each version, checkpoint is saved in specific directory structure that allows successive
* versions to reuse to SST data files and archived log files. This allows each commit to be
* incremental, only new SST files and archived log files generated by RocksDB will be uploaded.
* The directory structures on local disk and in DFS are as follows.
*
* Local checkpoint dir structure
* ------------------------------
* RocksDB generates a bunch of files in the local checkpoint directory. The most important among
* them are the SST files; they are the actual log structured data files. Rest of the files contain
* the metadata necessary for RocksDB to read the SST files and start from the checkpoint.
* Note that the SST files are hard links to files in the RocksDB's working directory, and therefore
* successive checkpoints can share some of the SST files. So these SST files have to be copied to
* DFS in shared directory such that different committed versions can save them.
*
* We consider both SST files and archived log files as immutable files which can be shared between
* different checkpoints.
*
* localCheckpointDir
* |
* +-- OPTIONS-000005
* +-- MANIFEST-000008
* +-- CURRENT
* +-- 00007.sst
* +-- 00011.sst
* +-- archive
* | +-- 00008.log
* | +-- 00013.log
* ...
*
*
* DFS directory structure after saving to DFS as version 10
* -----------------------------------------------------------
* The SST and archived log files are given unique file names and copied to the shared subdirectory.
* Every version maintains a mapping of local immutable file name to the unique file name in DFS.
* This mapping is saved in a JSON file (named `metadata`), which is zipped along with other
* checkpoint files into a single file `[version].zip`.
*
* dfsRootDir
* |
* +-- SSTs
* | +-- 00007-[uuid1].sst
* | +-- 00011-[uuid2].sst
* +-- logs
* | +-- 00008-[uuid3].log
* | +-- 00013-[uuid4].log
* +-- 10.zip
* | +-- metadata <--- contains mapping between 00007.sst and [uuid1].sst,
* and the mapping between 00008.log and [uuid3].log
* | +-- OPTIONS-000005
* | +-- MANIFEST-000008
* | +-- CURRENT
* | ...
* |
* +-- 9.zip
* +-- 8.zip
* ...
*
* Note the following.
* - Each [version].zip is a complete description of all the data and metadata needed to recover
* a RocksDB instance at the corresponding version. The SST files and log files are not included
* in the zip files, they can be shared cross different versions. This is unlike the
* [version].delta files of HDFSBackedStateStore where previous delta files needs to be read
* to be recovered.
* - This is safe wrt speculatively executed tasks running concurrently in different executors
* as each task would upload a different copy of the generated immutable files and
* atomically update the [version].zip.
* - Immutable files are identified uniquely based on their file name and file size.
* - Immutable files can be reused only across adjacent checkpoints/versions.
* - This class is thread-safe. Specifically, it is safe to concurrently delete old files from a
* different thread than the task thread saving files.
*
* @param dfsRootDir Directory where the [version].zip files will be stored
* @param localTempDir Local directory for temporary work
* @param hadoopConf Hadoop configuration for talking to DFS
* @param loggingId Id that will be prepended in logs for isolating concurrent RocksDBs
*/
class RocksDBFileManager(
dfsRootDir: String,
localTempDir: File,
hadoopConf: Configuration,
loggingId: String = "")
extends Logging {
import RocksDBImmutableFile._
private val versionToRocksDBFiles = new ConcurrentHashMap[Long, Seq[RocksDBImmutableFile]]
private lazy val fm = CheckpointFileManager.create(new Path(dfsRootDir), hadoopConf)
private val fs = new Path(dfsRootDir).getFileSystem(hadoopConf)
private val onlyZipFiles = new PathFilter {
override def accept(path: Path): Boolean = path.toString.endsWith(".zip")
}
/**
* Metrics for loading checkpoint from DFS. Every loadCheckpointFromDFS call will update this
* metrics, so this effectively records the latest metrics.
*/
@volatile private var loadCheckpointMetrics = RocksDBFileManagerMetrics.EMPTY_METRICS
/**
* Metrics for saving checkpoint to DFS. Every saveCheckpointToDFS call will update this
* metrics, so this effectively records the latest metrics.
*/
@volatile private var saveCheckpointMetrics = RocksDBFileManagerMetrics.EMPTY_METRICS
def latestLoadCheckpointMetrics: RocksDBFileManagerMetrics = loadCheckpointMetrics
def latestSaveCheckpointMetrics: RocksDBFileManagerMetrics = saveCheckpointMetrics
/** Save all the files in given local checkpoint directory as a committed version in DFS */
def saveCheckpointToDfs(checkpointDir: File, version: Long, numKeys: Long): Unit = {
logFilesInDir(checkpointDir, s"Saving checkpoint files for version $version")
val (localImmutableFiles, localOtherFiles) = listRocksDBFiles(checkpointDir)
val rocksDBFiles = saveImmutableFilesToDfs(version, localImmutableFiles)
val metadata = RocksDBCheckpointMetadata(rocksDBFiles, numKeys)
val metadataFile = localMetadataFile(checkpointDir)
metadata.writeToFile(metadataFile)
logInfo(s"Written metadata for version $version:\n${metadata.prettyJson}")
if (version <= 1 && numKeys <= 0) {
// If we're writing the initial version and there's no data, we have to explicitly initialize
// the root directory. Normally saveImmutableFilesToDfs will do this initialization, but
// when there's no data that method won't write any files, and zipToDfsFile uses the
// CheckpointFileManager.createAtomic API which doesn't auto-initialize parent directories.
// Moreover, once we disable to track the number of keys, in which the numKeys is -1, we
// still need to create the initial dfs root directory anyway.
val path = new Path(dfsRootDir)
if (!fm.exists(path)) fm.mkdirs(path)
}
zipToDfsFile(localOtherFiles :+ metadataFile, dfsBatchZipFile(version))
logInfo(s"Saved checkpoint file for version $version")
}
/**
* Load all necessary files for specific checkpoint version from DFS to given local directory.
* If version is 0, then it will delete all files in the directory. For other versions, it
* ensures that only the exact files generated during checkpointing will be present in the
* local directory.
*/
def loadCheckpointFromDfs(version: Long, localDir: File): RocksDBCheckpointMetadata = {
logInfo(s"Loading checkpoint files for version $version")
val metadata = if (version == 0) {
if (localDir.exists) Utils.deleteRecursively(localDir)
localDir.mkdirs()
RocksDBCheckpointMetadata(Seq.empty, 0)
} else {
// Delete all non-immutable files in local dir, and unzip new ones from DFS commit file
listRocksDBFiles(localDir)._2.foreach(_.delete())
Utils.unzipFilesFromFile(fs, dfsBatchZipFile(version), localDir)
// Copy the necessary immutable files
val metadataFile = localMetadataFile(localDir)
val metadata = RocksDBCheckpointMetadata.readFromFile(metadataFile)
logInfo(s"Read metadata for version $version:\n${metadata.prettyJson}")
loadImmutableFilesFromDfs(metadata.immutableFiles, localDir)
versionToRocksDBFiles.put(version, metadata.immutableFiles)
metadataFile.delete()
metadata
}
logFilesInDir(localDir, s"Loaded checkpoint files for version $version")
metadata
}
/** Get the latest version available in the DFS directory. If no data present, it returns 0. */
def getLatestVersion(): Long = {
val path = new Path(dfsRootDir)
if (fm.exists(path)) {
fm.list(path, onlyZipFiles)
.map(_.getPath.getName.stripSuffix(".zip"))
.map(_.toLong)
.foldLeft(0L)(math.max)
} else {
0
}
}
/**
* Delete old versions by deleting the associated version and SST files.
* At a high-level, this method finds which versions to delete, and which SST files that were
* last used in those versions. It's safe to delete these SST files because a SST file can
* be reused only in successive versions. Therefore, if a SST file F was last used in version
* V, then it won't be used in version V+1 or later, and if version V can be deleted, then
* F can safely be deleted as well.
*
* To find old files, it does the following.
* - List all the existing [version].zip files
* - Find the min version that needs to be retained based on the given `numVersionsToRetain`.
* - Accordingly decide which versions should be deleted.
* - Resolve all SSTs files of all the existing versions, if not already resolved.
* - Find what was the latest version in which each SST file was used.
* - Delete the files that were last used in the to-be-deleted versions as we will not
* need those files any more.
*
* Note that it only deletes files that it knows are safe to delete.
* It may not delete the following files.
* - Partially written SST files
* - SST files that were used in a version, but that version got overwritten with a different
* set of SST files.
*/
def deleteOldVersions(numVersionsToRetain: Int): Unit = {
val path = new Path(dfsRootDir)
// All versions present in DFS, sorted
val sortedVersions = fm.list(path, onlyZipFiles)
.map(_.getPath.getName.stripSuffix(".zip"))
.map(_.toLong)
.sorted
// Return if no versions generated yet
if (sortedVersions.isEmpty) return
// Find the versions to delete
val maxVersionPresent = sortedVersions.last
val minVersionPresent = sortedVersions.head
val minVersionToRetain =
math.max(minVersionPresent, maxVersionPresent - numVersionsToRetain + 1)
val versionsToDelete = sortedVersions.takeWhile(_ < minVersionToRetain).toSet[Long]
// Return if no version to delete
if (versionsToDelete.isEmpty) return
logInfo(
s"Versions present: (min $minVersionPresent, max $maxVersionPresent), " +
s"cleaning up all versions older than $minVersionToRetain to retain last " +
s"$numVersionsToRetain versions")
// Resolve RocksDB files for all the versions and find the max version each file is used
val fileToMaxUsedVersion = new mutable.HashMap[RocksDBImmutableFile, Long]
sortedVersions.foreach { version =>
val files = Option(versionToRocksDBFiles.get(version)).getOrElse {
val newResolvedFiles = getImmutableFilesFromVersionZip(version)
versionToRocksDBFiles.put(version, newResolvedFiles)
newResolvedFiles
}
files.foreach(f => fileToMaxUsedVersion(f) = version)
}
// Best effort attempt to delete SST files that were last used in to-be-deleted versions
val filesToDelete = fileToMaxUsedVersion.filter { case (_, v) => versionsToDelete.contains(v) }
logInfo(s"Deleting ${filesToDelete.size} files not used in versions >= $minVersionToRetain")
var failedToDelete = 0
filesToDelete.foreach { case (file, maxUsedVersion) =>
try {
val dfsFile = dfsFilePath(file.dfsFileName)
fm.delete(dfsFile)
logDebug(s"Deleted file $file that was last used in version $maxUsedVersion")
} catch {
case e: Exception =>
failedToDelete += 1
logWarning(s"Error deleting file $file, last used in version $maxUsedVersion", e)
}
}
// Delete the version files and forget about them
versionsToDelete.foreach { version =>
val versionFile = dfsBatchZipFile(version)
try {
fm.delete(versionFile)
versionToRocksDBFiles.remove(version)
logDebug(s"Deleted version $version")
} catch {
case e: Exception =>
logWarning(s"Error deleting version file $versionFile for version $version", e)
}
}
logInfo(s"Deleted ${filesToDelete.size - failedToDelete} files (failed to delete" +
s"$failedToDelete files) not used in versions >= $minVersionToRetain")
}
/** Save immutable files to DFS directory */
private def saveImmutableFilesToDfs(
version: Long,
localFiles: Seq[File]): Seq[RocksDBImmutableFile] = {
// Get the immutable files used in previous versions, as some of those uploaded files can be
// reused for this version
logInfo(s"Saving RocksDB files to DFS for $version")
val prevFilesToSizes = versionToRocksDBFiles.values.asScala.flatten.map { f =>
f.localFileName -> f
}.toMap
var bytesCopied = 0L
var filesCopied = 0L
var filesReused = 0L
val immutableFiles = localFiles.map { localFile =>
prevFilesToSizes
.get(localFile.getName)
.filter(_.isSameFile(localFile))
.map { reusable =>
filesReused += 1
reusable
}.getOrElse {
val localFileName = localFile.getName
val dfsFileName = newDFSFileName(localFileName)
val dfsFile = dfsFilePath(dfsFileName)
// Note: The implementation of copyFromLocalFile() closes the output stream when there is
// any exception while copying. So this may generate partial files on DFS. But that is
// okay because until the main [version].zip file is written, those partial files are
// not going to be used at all. Eventually these files should get cleared.
fs.copyFromLocalFile(
new Path(localFile.getAbsoluteFile.toURI), dfsFile)
val localFileSize = localFile.length()
logInfo(s"Copied $localFile to $dfsFile - $localFileSize bytes")
filesCopied += 1
bytesCopied += localFileSize
RocksDBImmutableFile(localFile.getName, dfsFileName, localFileSize)
}
}
logInfo(s"Copied $filesCopied files ($bytesCopied bytes) from local to" +
s" DFS for version $version. $filesReused files reused without copying.")
versionToRocksDBFiles.put(version, immutableFiles)
saveCheckpointMetrics = RocksDBFileManagerMetrics(
bytesCopied = bytesCopied,
filesCopied = filesCopied,
filesReused = filesReused)
immutableFiles
}
/**
* Copy files from DFS directory to a local directory. It will figure out which
* existing files are needed, and accordingly, unnecessary SST files are deleted while
* necessary and non-existing files are copied from DFS.
*/
private def loadImmutableFilesFromDfs(
immutableFiles: Seq[RocksDBImmutableFile], localDir: File): Unit = {
val requiredFileNameToFileDetails = immutableFiles.map(f => f.localFileName -> f).toMap
// Delete unnecessary local immutable files
listRocksDBFiles(localDir)._1
.foreach { existingFile =>
val isSameFile =
requiredFileNameToFileDetails.get(existingFile.getName).exists(_.isSameFile(existingFile))
if (!isSameFile) {
existingFile.delete()
logInfo(s"Deleted local file $existingFile")
}
}
var filesCopied = 0L
var bytesCopied = 0L
var filesReused = 0L
immutableFiles.foreach { file =>
val localFileName = file.localFileName
val localFile = localFilePath(localDir, localFileName)
if (!localFile.exists) {
val dfsFile = dfsFilePath(file.dfsFileName)
// Note: The implementation of copyToLocalFile() closes the output stream when there is
// any exception while copying. So this may generate partial files on DFS. But that is
// okay because until the main [version].zip file is written, those partial files are
// not going to be used at all. Eventually these files should get cleared.
fs.copyToLocalFile(dfsFile, new Path(localFile.getAbsoluteFile.toURI))
val localFileSize = localFile.length()
val expectedSize = file.sizeBytes
if (localFileSize != expectedSize) {
throw new IllegalStateException(
s"Copied $dfsFile to $localFile," +
s" expected $expectedSize bytes, found $localFileSize bytes ")
}
filesCopied += 1
bytesCopied += localFileSize
logInfo(s"Copied $dfsFile to $localFile - $localFileSize bytes")
} else {
filesReused += 1
}
}
logInfo(s"Copied $filesCopied files ($bytesCopied bytes) from DFS to local with " +
s"$filesReused files reused.")
loadCheckpointMetrics = RocksDBFileManagerMetrics(
bytesCopied = bytesCopied,
filesCopied = filesCopied,
filesReused = filesReused)
}
/** Get the SST files required for a version from the version zip file in DFS */
private def getImmutableFilesFromVersionZip(version: Long): Seq[RocksDBImmutableFile] = {
Utils.deleteRecursively(localTempDir)
localTempDir.mkdirs()
Utils.unzipFilesFromFile(fs, dfsBatchZipFile(version), localTempDir)
val metadataFile = localMetadataFile(localTempDir)
val metadata = RocksDBCheckpointMetadata.readFromFile(metadataFile)
metadata.immutableFiles
}
/**
* Compress files to a single zip file in DFS. Only the file names are embedded in the zip.
* Any error while writing will ensure that the file is not written.
*/
private def zipToDfsFile(files: Seq[File], dfsZipFile: Path): Unit = {
lazy val filesStr = s"$dfsZipFile\n\t${files.mkString("\n\t")}"
var in: InputStream = null
val out = fm.createAtomic(dfsZipFile, overwriteIfPossible = true)
var totalBytes = 0L
val zout = new ZipOutputStream(out)
try {
files.foreach { file =>
zout.putNextEntry(new ZipEntry(file.getName))
in = new FileInputStream(file)
val bytes = IOUtils.copy(in, zout)
in.close()
zout.closeEntry()
totalBytes += bytes
}
zout.close() // so that any error in closing also cancels the output stream
logInfo(s"Zipped $totalBytes bytes (before compression) to $filesStr")
// The other fields saveCheckpointMetrics should have been filled
saveCheckpointMetrics =
saveCheckpointMetrics.copy(zipFileBytesUncompressed = Some(totalBytes))
} catch {
case e: Exception =>
// Cancel the actual output stream first, so that zout.close() does not write the file
out.cancel()
logError(s"Error zipping to $filesStr", e)
throw e
} finally {
// Close everything no matter what happened
IOUtils.closeQuietly(in)
IOUtils.closeQuietly(zout)
}
}
/** Log the files present in a directory. This is useful for debugging. */
private def logFilesInDir(dir: File, msg: String): Unit = {
lazy val files = Option(Utils.recursiveList(dir)).getOrElse(Array.empty).map { f =>
s"${f.getAbsolutePath} - ${f.length()} bytes"
}
logInfo(s"$msg - ${files.length} files\n\t${files.mkString("\n\t")}")
}
private def newDFSFileName(localFileName: String): String = {
val baseName = FilenameUtils.getBaseName(localFileName)
val extension = FilenameUtils.getExtension(localFileName)
s"$baseName-${UUID.randomUUID}.$extension"
}
private def dfsBatchZipFile(version: Long): Path = new Path(s"$dfsRootDir/$version.zip")
private def localMetadataFile(parentDir: File): File = new File(parentDir, "metadata")
override protected def logName: String = s"${super.logName} $loggingId"
private def dfsFilePath(fileName: String): Path = {
if (isSstFile(fileName)) {
new Path(new Path(dfsRootDir, SST_FILES_DFS_SUBDIR), fileName)
} else if (isLogFile(fileName)) {
new Path(new Path(dfsRootDir, LOG_FILES_DFS_SUBDIR), fileName)
} else {
new Path(dfsRootDir, fileName)
}
}
private def localFilePath(localDir: File, fileName: String): File = {
if (isLogFile(fileName)) {
new File(new File(localDir, LOG_FILES_LOCAL_SUBDIR), fileName)
} else {
new File(localDir, fileName)
}
}
/**
* List all the RocksDB files that need be synced or recovered.
*/
private def listRocksDBFiles(localDir: File): (Seq[File], Seq[File]) = {
val topLevelFiles = localDir.listFiles.filter(!_.isDirectory)
val archivedLogFiles =
Option(new File(localDir, LOG_FILES_LOCAL_SUBDIR).listFiles())
.getOrElse(Array[File]())
// To ignore .log.crc files
.filter(file => isLogFile(file.getName))
val (topLevelSstFiles, topLevelOtherFiles) = topLevelFiles.partition(f => isSstFile(f.getName))
(topLevelSstFiles ++ archivedLogFiles, topLevelOtherFiles)
}
}
/**
* Metrics regarding RocksDB file sync between local and DFS.
*/
case class RocksDBFileManagerMetrics(
filesCopied: Long,
bytesCopied: Long,
filesReused: Long,
@JsonDeserialize(contentAs = classOf[java.lang.Long])
zipFileBytesUncompressed: Option[Long] = None)
/**
* Metrics to return when requested but no operation has been performed.
*/
object RocksDBFileManagerMetrics {
val EMPTY_METRICS = RocksDBFileManagerMetrics(0L, 0L, 0L, None)
}
/**
* Classes to represent metadata of checkpoints saved to DFS. Since this is converted to JSON, any
* changes to this MUST be backward-compatible.
*/
case class RocksDBCheckpointMetadata(
sstFiles: Seq[RocksDBSstFile],
logFiles: Seq[RocksDBLogFile],
numKeys: Long) {
import RocksDBCheckpointMetadata._
def json: String = {
// We turn this field into a null to avoid write a empty logFiles field in the json.
val nullified = if (logFiles.isEmpty) this.copy(logFiles = null) else this
mapper.writeValueAsString(nullified)
}
def prettyJson: String = Serialization.writePretty(this)(RocksDBCheckpointMetadata.format)
def writeToFile(metadataFile: File): Unit = {
val writer = Files.newBufferedWriter(metadataFile.toPath, UTF_8)
try {
writer.write(s"v$VERSION\n")
writer.write(this.json)
} finally {
writer.close()
}
}
def immutableFiles: Seq[RocksDBImmutableFile] = sstFiles ++ logFiles
}
/** Helper class for [[RocksDBCheckpointMetadata]] */
object RocksDBCheckpointMetadata {
val VERSION = 1
implicit val format = Serialization.formats(NoTypeHints)
/** Used to convert between classes and JSON. */
lazy val mapper = {
val _mapper = new ObjectMapper with ClassTagExtensions
_mapper.setSerializationInclusion(Include.NON_ABSENT)
_mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
_mapper.registerModule(DefaultScalaModule)
_mapper
}
def readFromFile(metadataFile: File): RocksDBCheckpointMetadata = {
val reader = Files.newBufferedReader(metadataFile.toPath, UTF_8)
try {
val versionLine = reader.readLine()
if (versionLine != s"v$VERSION") {
throw new IllegalStateException(
s"Cannot read RocksDB checkpoint metadata of version $versionLine")
}
Serialization.read[RocksDBCheckpointMetadata](reader)
} finally {
reader.close()
}
}
def apply(rocksDBFiles: Seq[RocksDBImmutableFile], numKeys: Long): RocksDBCheckpointMetadata = {
val sstFiles = rocksDBFiles.collect { case file: RocksDBSstFile => file }
val logFiles = rocksDBFiles.collect { case file: RocksDBLogFile => file }
RocksDBCheckpointMetadata(sstFiles, logFiles, numKeys)
}
}
/**
* A RocksDBImmutableFile maintains a mapping between a local RocksDB file name and the name of
* its copy on DFS. Since these files are immutable, their DFS copies can be reused.
*/
sealed trait RocksDBImmutableFile {
def localFileName: String
def dfsFileName: String
def sizeBytes: Long
/**
* Whether another local file is same as the file described by this class.
* A file is same only when the name and the size are same.
*/
def isSameFile(otherFile: File): Boolean = {
otherFile.getName == localFileName && otherFile.length() == sizeBytes
}
}
/**
* Class to represent a RocksDB SST file. Since this is converted to JSON,
* any changes to these MUST be backward-compatible.
*/
private[sql] case class RocksDBSstFile(
localFileName: String,
dfsSstFileName: String,
sizeBytes: Long) extends RocksDBImmutableFile {
override def dfsFileName: String = dfsSstFileName
}
/**
* Class to represent a RocksDB Log file. Since this is converted to JSON,
* any changes to these MUST be backward-compatible.
*/
private[sql] case class RocksDBLogFile(
localFileName: String,
dfsLogFileName: String,
sizeBytes: Long) extends RocksDBImmutableFile {
override def dfsFileName: String = dfsLogFileName
}
object RocksDBImmutableFile {
val SST_FILES_DFS_SUBDIR = "SSTs"
val LOG_FILES_DFS_SUBDIR = "logs"
val LOG_FILES_LOCAL_SUBDIR = "archive"
def apply(localFileName: String, dfsFileName: String, sizeBytes: Long): RocksDBImmutableFile = {
if (isSstFile(localFileName)) {
RocksDBSstFile(localFileName, dfsFileName, sizeBytes)
} else if (isLogFile(localFileName)) {
RocksDBLogFile(localFileName, dfsFileName, sizeBytes)
} else {
null
}
}
def isSstFile(fileName: String): Boolean = fileName.endsWith(".sst")
def isLogFile(fileName: String): Boolean = fileName.endsWith(".log")
private def isArchivedLogFile(file: File): Boolean =
isLogFile(file.getName) && file.getParentFile.getName == LOG_FILES_LOCAL_SUBDIR
def isImmutableFile(file: File): Boolean = isSstFile(file.getName) || isArchivedLogFile(file)
}
相关信息
相关文章
spark FlatMapGroupsWithStateExecHelper 源码
spark HDFSBackedStateStoreMap 源码
spark HDFSBackedStateStoreProvider 源码
spark RocksDBStateStoreProvider 源码
0
赞
- 所属分类: 前端技术
- 本文标签:
热门推荐
-
2、 - 优质文章
-
3、 gate.io
-
7、 golang
-
9、 openharmony
-
10、 Vue中input框自动聚焦