Files
PandaX/apps/devops/services/k8s/job/job.go
2022-01-22 17:07:04 +08:00

254 lines
7.9 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package job
import (
"context"
"fmt"
"pandax/base/global"
batch "k8s.io/api/batch/v1"
v1 "k8s.io/api/core/v1"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"pandax/apps/devops/entity/k8s"
k8scommon "pandax/apps/devops/services/k8s/common"
"pandax/apps/devops/services/k8s/dataselect"
"pandax/apps/devops/services/k8s/event"
)
// JobList contains a list of Jobs in the cluster.
type JobList struct {
ListMeta k8s.ListMeta `json:"listMeta"`
// Basic information about resources status on the list.
Status k8scommon.ResourceStatus `json:"status"`
// Unordered list of Jobs.
Jobs []Job `json:"jobs"`
// List of non-critical errors, that occurred during resource retrieval.
Errors []error `json:"errors"`
}
type JobStatusType string
const (
// JobStatusRunning means the job is still running.
JobStatusRunning JobStatusType = "Running"
// JobStatusComplete means the job has completed its execution.
JobStatusComplete JobStatusType = "Complete"
// JobStatusFailed means the job has failed its execution.
JobStatusFailed JobStatusType = "Failed"
)
type JobStatus struct {
// Short, machine understandable job status code.
Status JobStatusType `json:"status"`
// A human-readable description of the status of related job.
Message string `json:"message"`
// Conditions describe the state of a job after it finishes.
Conditions []k8scommon.Condition `json:"conditions"`
}
// Job is a presentation layer view of Kubernetes Job resource. This means it is Job plus additional
// augmented data we can get from other sources
type Job struct {
ObjectMeta k8s.ObjectMeta `json:"objectMeta"`
TypeMeta k8s.TypeMeta `json:"typeMeta"`
// Aggregate information about pods belonging to this Job.
Pods k8scommon.PodInfo `json:"podInfo"`
// Container images of the Job.
ContainerImages []string `json:"containerImages"`
// Init Container images of the Job.
InitContainerImages []string `json:"initContainerImages"`
// number of parallel jobs defined.
Parallelism *int32 `json:"parallelism"`
// JobStatus contains inferred job status based on job conditions
JobStatus JobStatus `json:"jobStatus"`
PodStatus PodStatus `json:"podStatus"`
}
type PodStatus struct {
// Represents time when the job controller started processing a job. When a
// Job is created in the suspended state, this field is not set until the
// first time it is resumed. This field is reset every time a Job is resumed
// from suspension. It is represented in RFC3339 form and is in UTC.
// +optional
StartTime *meta.Time `json:"startTime"`
// Represents time when the job was completed. It is not guaranteed to
// be set in happens-before order across separate operations.
// It is represented in RFC3339 form and is in UTC.
// The completion time is only set when the job finishes successfully.
// +optional
CompletionTime *meta.Time `json:"completionTime"`
// The number of actively running pods.
// +optional
Active int32 `json:"active"`
// The number of pods which reached phase Succeeded.
// +optional
Succeeded int32 `json:"succeeded"`
// The number of pods which reached phase Failed.
// +optional
Failed int32 `json:"failed"`
}
// GetJobList returns a list of all Jobs in the cluster.
func GetJobList(client *kubernetes.Clientset, nsQuery *k8scommon.NamespaceQuery, dsQuery *dataselect.DataSelectQuery) (*JobList, error) {
global.Log.Info("Getting list of all jobs in the cluster")
channels := &k8scommon.ResourceChannels{
JobList: k8scommon.GetJobListChannel(client, nsQuery, 1),
PodList: k8scommon.GetPodListChannel(client, nsQuery, 1),
EventList: k8scommon.GetEventListChannel(client, nsQuery, 1),
}
return GetJobListFromChannels(channels, dsQuery)
}
// GetJobListFromChannels returns a list of all Jobs in the cluster reading required resource list once from the channels.
func GetJobListFromChannels(channels *k8scommon.ResourceChannels, dsQuery *dataselect.DataSelectQuery) (*JobList, error) {
jobs := <-channels.JobList.List
err := <-channels.JobList.Error
if err != nil {
return nil, err
}
pods := <-channels.PodList.List
err = <-channels.PodList.Error
if err != nil {
return nil, err
}
events := <-channels.EventList.List
err = <-channels.EventList.Error
if err != nil {
return nil, err
}
jobList := ToJobList(jobs.Items, pods.Items, events.Items, dsQuery)
jobList.Status = getStatus(jobs, pods.Items)
return jobList, nil
}
func ToJobList(jobs []batch.Job, pods []v1.Pod, events []v1.Event, dsQuery *dataselect.DataSelectQuery) *JobList {
jobList := &JobList{
Jobs: make([]Job, 0),
ListMeta: k8s.ListMeta{TotalItems: len(jobs)},
}
jobCells, filteredTotal := dataselect.GenericDataSelectWithFilter(ToCells(jobs), dsQuery)
jobs = FromCells(jobCells)
jobList.ListMeta = k8s.ListMeta{TotalItems: filteredTotal}
for _, job := range jobs {
matchingPods := k8scommon.FilterPodsForJob(job, pods)
podInfo := k8scommon.GetPodInfo(job.Status.Active, job.Spec.Completions, matchingPods)
podInfo.Warnings = event.GetPodsEventWarnings(events, matchingPods)
jobList.Jobs = append(jobList.Jobs, toJob(&job, &podInfo))
}
return jobList
}
func toJob(job *batch.Job, podInfo *k8scommon.PodInfo) Job {
return Job{
ObjectMeta: k8s.NewObjectMeta(job.ObjectMeta),
TypeMeta: k8s.NewTypeMeta(k8s.ResourceKindJob),
ContainerImages: k8scommon.GetContainerImages(&job.Spec.Template.Spec),
InitContainerImages: k8scommon.GetInitContainerImages(&job.Spec.Template.Spec),
Pods: *podInfo,
JobStatus: GetJobStatus(job),
PodStatus: GetPodStatus(job),
Parallelism: job.Spec.Parallelism,
}
}
func GetJobStatus(job *batch.Job) JobStatus {
jobStatus := JobStatus{Status: JobStatusRunning, Conditions: getJobConditions(job)}
for _, condition := range job.Status.Conditions {
if condition.Type == batch.JobComplete && condition.Status == v1.ConditionTrue {
jobStatus.Status = JobStatusComplete
break
} else if condition.Type == batch.JobFailed && condition.Status == v1.ConditionTrue {
jobStatus.Status = JobStatusFailed
jobStatus.Message = condition.Message
break
}
}
return jobStatus
}
func getJobConditions(job *batch.Job) []k8scommon.Condition {
var conditions []k8scommon.Condition
for _, condition := range job.Status.Conditions {
conditions = append(conditions, k8scommon.Condition{
Type: string(condition.Type),
Status: meta.ConditionStatus(condition.Status),
LastProbeTime: condition.LastProbeTime,
LastTransitionTime: condition.LastTransitionTime,
Reason: condition.Reason,
Message: condition.Message,
})
}
return conditions
}
func GetPodStatus(job *batch.Job) PodStatus {
return PodStatus{
Active: job.Status.Active,
Succeeded: job.Status.Succeeded,
Failed: job.Status.Failed,
StartTime: job.Status.StartTime,
CompletionTime: job.Status.CompletionTime,
}
}
func DeleteJob(client *kubernetes.Clientset, namespace, name string) (err error) {
return client.BatchV1().Jobs(namespace).Delete(context.TODO(), name, meta.DeleteOptions{})
}
func DeleteCollectionJob(client *kubernetes.Clientset, jobList []k8s.JobData) (err error) {
global.Log.Info("批量删除job开始")
for _, v := range jobList {
global.Log.Info(fmt.Sprintf("delete job%v, ns: %v", v.Name, v.Namespace))
err := client.BatchV1().Jobs(v.Namespace).Delete(
context.TODO(),
v.Name,
meta.DeleteOptions{},
)
if err != nil {
global.Log.Error(err.Error())
return err
}
}
global.Log.Info("删除job已完成")
return nil
}
func ScaleJob(client *kubernetes.Clientset, namespace, name string, scaleNumber *int32) (err error) {
job, err := client.BatchV1().Jobs(namespace).Get(context.TODO(), name, meta.GetOptions{})
if err != nil {
return err
}
job.Spec.Parallelism = scaleNumber
_, err = client.BatchV1().Jobs(namespace).Update(context.TODO(), job, meta.UpdateOptions{})
if err != nil {
return err
}
return nil
}