kafka CoordinatorRestResource 源码

  • 2022-10-20
  • 浏览 (329)

kafka CoordinatorRestResource 代码

文件路径:/trogdor/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorRestResource.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements. See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License. You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.kafka.trogdor.coordinator;

import org.apache.kafka.trogdor.rest.CoordinatorShutdownRequest;
import org.apache.kafka.trogdor.rest.CoordinatorStatusResponse;
import org.apache.kafka.trogdor.rest.CreateTaskRequest;
import org.apache.kafka.trogdor.rest.DestroyTaskRequest;
import org.apache.kafka.trogdor.rest.Empty;
import org.apache.kafka.trogdor.rest.StopTaskRequest;
import org.apache.kafka.trogdor.rest.TaskRequest;
import org.apache.kafka.trogdor.rest.TaskState;
import org.apache.kafka.trogdor.rest.TaskStateType;
import org.apache.kafka.trogdor.rest.TasksRequest;
import org.apache.kafka.trogdor.rest.TasksResponse;
import org.apache.kafka.trogdor.rest.UptimeResponse;

import javax.servlet.ServletContext;
import javax.ws.rs.Consumes;
import javax.ws.rs.DELETE;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
import javax.ws.rs.NotFoundException;
import javax.ws.rs.POST;
import javax.ws.rs.PUT;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;

/**
 * The REST resource for the Coordinator. This describes the RPCs which the coordinator
 * can accept.
 *
 * RPCs should be idempotent.  This is important because if the server's response is
 * lost, the client will simply retransmit the same request. The server's response must
 * be the same the second time around.
 *
 * We return the empty JSON object {} rather than void for RPCs that have no results.
 * This ensures that if we want to add more return results later, we can do so in a
 * compatible way.
 */
@Path("/coordinator")
@Produces(MediaType.APPLICATION_JSON)
@Consumes(MediaType.APPLICATION_JSON)
public class CoordinatorRestResource {
    private final AtomicReference<Coordinator> coordinator = new AtomicReference<Coordinator>();

    @javax.ws.rs.core.Context
    private ServletContext context;

    public void setCoordinator(Coordinator myCoordinator) {
        coordinator.set(myCoordinator);
    }

    @GET
    @Path("/status")
    public CoordinatorStatusResponse status() throws Throwable {
        return coordinator().status();
    }

    @GET
    @Path("/uptime")
    public UptimeResponse uptime() {
        return coordinator().uptime();
    }

    @POST
    @Path("/task/create")
    public Empty createTask(CreateTaskRequest request) throws Throwable {
        coordinator().createTask(request);
        return Empty.INSTANCE;
    }

    @PUT
    @Path("/task/stop")
    public Empty stopTask(StopTaskRequest request) throws Throwable {
        coordinator().stopTask(request);
        return Empty.INSTANCE;
    }

    @DELETE
    @Path("/tasks")
    public Empty destroyTask(@DefaultValue("") @QueryParam("taskId") String taskId) throws Throwable {
        coordinator().destroyTask(new DestroyTaskRequest(taskId));
        return Empty.INSTANCE;
    }

    @GET
    @Path("/tasks/")
    public Response tasks(@QueryParam("taskId") List<String> taskId,
            @DefaultValue("0") @QueryParam("firstStartMs") long firstStartMs,
            @DefaultValue("0") @QueryParam("lastStartMs") long lastStartMs,
            @DefaultValue("0") @QueryParam("firstEndMs") long firstEndMs,
            @DefaultValue("0") @QueryParam("lastEndMs") long lastEndMs,
            @DefaultValue("") @QueryParam("state") String state) throws Throwable {
        boolean isEmptyState = state.equals("");
        if (!isEmptyState && !TaskStateType.Constants.VALUES.contains(state)) {
            return Response.status(400).entity(
                String.format("State %s is invalid. Must be one of %s",
                    state, TaskStateType.Constants.VALUES)
            ).build();
        }

        Optional<TaskStateType> givenState = Optional.ofNullable(isEmptyState ? null : TaskStateType.valueOf(state));
        TasksResponse resp = coordinator().tasks(new TasksRequest(taskId, firstStartMs, lastStartMs, firstEndMs, lastEndMs, givenState));

        return Response.status(200).entity(resp).build();
    }

    @GET
    @Path("/tasks/{taskId}")
    public TaskState tasks(@PathParam("taskId") String taskId) throws Throwable {
        TaskState response = coordinator().task(new TaskRequest(taskId));
        if (response == null)
            throw new NotFoundException(String.format("No task with ID \"%s\" exists.", taskId));

        return response;
    }

    @PUT
    @Path("/shutdown")
    public Empty beginShutdown(CoordinatorShutdownRequest request) throws Throwable {
        coordinator().beginShutdown(request.stopAgents());
        return Empty.INSTANCE;
    }

    private Coordinator coordinator() {
        Coordinator myCoordinator = coordinator.get();
        if (myCoordinator == null) {
            throw new RuntimeException("CoordinatorRestResource has not been initialized yet.");
        }
        return myCoordinator;
    }
}

相关信息

kafka 源码目录

相关文章

kafka Coordinator 源码

kafka CoordinatorClient 源码

kafka NodeManager 源码

kafka TaskManager 源码

0  赞