【修改】k8s 配置

This commit is contained in:
PandaGoAdmin
2022-01-22 17:07:04 +08:00
parent c6ebe89865
commit 33cc74711d
439 changed files with 9936 additions and 21687 deletions

View File

@@ -0,0 +1,253 @@
package job
import (
"context"
"fmt"
"pandax/base/global"
batch "k8s.io/api/batch/v1"
v1 "k8s.io/api/core/v1"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"pandax/apps/devops/entity/k8s"
k8scommon "pandax/apps/devops/services/k8s/common"
"pandax/apps/devops/services/k8s/dataselect"
"pandax/apps/devops/services/k8s/event"
)
// JobList contains a list of Jobs in the cluster.
type JobList struct {
ListMeta k8s.ListMeta `json:"listMeta"`
// Basic information about resources status on the list.
Status k8scommon.ResourceStatus `json:"status"`
// Unordered list of Jobs.
Jobs []Job `json:"jobs"`
// List of non-critical errors, that occurred during resource retrieval.
Errors []error `json:"errors"`
}
type JobStatusType string
const (
// JobStatusRunning means the job is still running.
JobStatusRunning JobStatusType = "Running"
// JobStatusComplete means the job has completed its execution.
JobStatusComplete JobStatusType = "Complete"
// JobStatusFailed means the job has failed its execution.
JobStatusFailed JobStatusType = "Failed"
)
type JobStatus struct {
// Short, machine understandable job status code.
Status JobStatusType `json:"status"`
// A human-readable description of the status of related job.
Message string `json:"message"`
// Conditions describe the state of a job after it finishes.
Conditions []k8scommon.Condition `json:"conditions"`
}
// Job is a presentation layer view of Kubernetes Job resource. This means it is Job plus additional
// augmented data we can get from other sources
type Job struct {
ObjectMeta k8s.ObjectMeta `json:"objectMeta"`
TypeMeta k8s.TypeMeta `json:"typeMeta"`
// Aggregate information about pods belonging to this Job.
Pods k8scommon.PodInfo `json:"podInfo"`
// Container images of the Job.
ContainerImages []string `json:"containerImages"`
// Init Container images of the Job.
InitContainerImages []string `json:"initContainerImages"`
// number of parallel jobs defined.
Parallelism *int32 `json:"parallelism"`
// JobStatus contains inferred job status based on job conditions
JobStatus JobStatus `json:"jobStatus"`
PodStatus PodStatus `json:"podStatus"`
}
type PodStatus struct {
// Represents time when the job controller started processing a job. When a
// Job is created in the suspended state, this field is not set until the
// first time it is resumed. This field is reset every time a Job is resumed
// from suspension. It is represented in RFC3339 form and is in UTC.
// +optional
StartTime *meta.Time `json:"startTime"`
// Represents time when the job was completed. It is not guaranteed to
// be set in happens-before order across separate operations.
// It is represented in RFC3339 form and is in UTC.
// The completion time is only set when the job finishes successfully.
// +optional
CompletionTime *meta.Time `json:"completionTime"`
// The number of actively running pods.
// +optional
Active int32 `json:"active"`
// The number of pods which reached phase Succeeded.
// +optional
Succeeded int32 `json:"succeeded"`
// The number of pods which reached phase Failed.
// +optional
Failed int32 `json:"failed"`
}
// GetJobList returns a list of all Jobs in the cluster.
func GetJobList(client *kubernetes.Clientset, nsQuery *k8scommon.NamespaceQuery, dsQuery *dataselect.DataSelectQuery) (*JobList, error) {
global.Log.Info("Getting list of all jobs in the cluster")
channels := &k8scommon.ResourceChannels{
JobList: k8scommon.GetJobListChannel(client, nsQuery, 1),
PodList: k8scommon.GetPodListChannel(client, nsQuery, 1),
EventList: k8scommon.GetEventListChannel(client, nsQuery, 1),
}
return GetJobListFromChannels(channels, dsQuery)
}
// GetJobListFromChannels returns a list of all Jobs in the cluster reading required resource list once from the channels.
func GetJobListFromChannels(channels *k8scommon.ResourceChannels, dsQuery *dataselect.DataSelectQuery) (*JobList, error) {
jobs := <-channels.JobList.List
err := <-channels.JobList.Error
if err != nil {
return nil, err
}
pods := <-channels.PodList.List
err = <-channels.PodList.Error
if err != nil {
return nil, err
}
events := <-channels.EventList.List
err = <-channels.EventList.Error
if err != nil {
return nil, err
}
jobList := ToJobList(jobs.Items, pods.Items, events.Items, dsQuery)
jobList.Status = getStatus(jobs, pods.Items)
return jobList, nil
}
func ToJobList(jobs []batch.Job, pods []v1.Pod, events []v1.Event, dsQuery *dataselect.DataSelectQuery) *JobList {
jobList := &JobList{
Jobs: make([]Job, 0),
ListMeta: k8s.ListMeta{TotalItems: len(jobs)},
}
jobCells, filteredTotal := dataselect.GenericDataSelectWithFilter(ToCells(jobs), dsQuery)
jobs = FromCells(jobCells)
jobList.ListMeta = k8s.ListMeta{TotalItems: filteredTotal}
for _, job := range jobs {
matchingPods := k8scommon.FilterPodsForJob(job, pods)
podInfo := k8scommon.GetPodInfo(job.Status.Active, job.Spec.Completions, matchingPods)
podInfo.Warnings = event.GetPodsEventWarnings(events, matchingPods)
jobList.Jobs = append(jobList.Jobs, toJob(&job, &podInfo))
}
return jobList
}
func toJob(job *batch.Job, podInfo *k8scommon.PodInfo) Job {
return Job{
ObjectMeta: k8s.NewObjectMeta(job.ObjectMeta),
TypeMeta: k8s.NewTypeMeta(k8s.ResourceKindJob),
ContainerImages: k8scommon.GetContainerImages(&job.Spec.Template.Spec),
InitContainerImages: k8scommon.GetInitContainerImages(&job.Spec.Template.Spec),
Pods: *podInfo,
JobStatus: GetJobStatus(job),
PodStatus: GetPodStatus(job),
Parallelism: job.Spec.Parallelism,
}
}
func GetJobStatus(job *batch.Job) JobStatus {
jobStatus := JobStatus{Status: JobStatusRunning, Conditions: getJobConditions(job)}
for _, condition := range job.Status.Conditions {
if condition.Type == batch.JobComplete && condition.Status == v1.ConditionTrue {
jobStatus.Status = JobStatusComplete
break
} else if condition.Type == batch.JobFailed && condition.Status == v1.ConditionTrue {
jobStatus.Status = JobStatusFailed
jobStatus.Message = condition.Message
break
}
}
return jobStatus
}
func getJobConditions(job *batch.Job) []k8scommon.Condition {
var conditions []k8scommon.Condition
for _, condition := range job.Status.Conditions {
conditions = append(conditions, k8scommon.Condition{
Type: string(condition.Type),
Status: meta.ConditionStatus(condition.Status),
LastProbeTime: condition.LastProbeTime,
LastTransitionTime: condition.LastTransitionTime,
Reason: condition.Reason,
Message: condition.Message,
})
}
return conditions
}
func GetPodStatus(job *batch.Job) PodStatus {
return PodStatus{
Active: job.Status.Active,
Succeeded: job.Status.Succeeded,
Failed: job.Status.Failed,
StartTime: job.Status.StartTime,
CompletionTime: job.Status.CompletionTime,
}
}
func DeleteJob(client *kubernetes.Clientset, namespace, name string) (err error) {
return client.BatchV1().Jobs(namespace).Delete(context.TODO(), name, meta.DeleteOptions{})
}
func DeleteCollectionJob(client *kubernetes.Clientset, jobList []k8s.JobData) (err error) {
global.Log.Info("批量删除job开始")
for _, v := range jobList {
global.Log.Info(fmt.Sprintf("delete job%v, ns: %v", v.Name, v.Namespace))
err := client.BatchV1().Jobs(v.Namespace).Delete(
context.TODO(),
v.Name,
meta.DeleteOptions{},
)
if err != nil {
global.Log.Error(err.Error())
return err
}
}
global.Log.Info("删除job已完成")
return nil
}
func ScaleJob(client *kubernetes.Clientset, namespace, name string, scaleNumber *int32) (err error) {
job, err := client.BatchV1().Jobs(namespace).Get(context.TODO(), name, meta.GetOptions{})
if err != nil {
return err
}
job.Spec.Parallelism = scaleNumber
_, err = client.BatchV1().Jobs(namespace).Update(context.TODO(), job, meta.UpdateOptions{})
if err != nil {
return err
}
return nil
}

View File

@@ -0,0 +1,67 @@
package job
import (
batch "k8s.io/api/batch/v1"
v1 "k8s.io/api/core/v1"
"pandax/apps/devops/services/k8s/common"
"pandax/apps/devops/services/k8s/dataselect"
)
// The code below allows to perform complex data section on []batch.Job
type JobCell batch.Job
func (self JobCell) GetProperty(name dataselect.PropertyName) dataselect.ComparableValue {
switch name {
case dataselect.NameProperty:
return dataselect.StdComparableString(self.ObjectMeta.Name)
case dataselect.CreationTimestampProperty:
return dataselect.StdComparableTime(self.ObjectMeta.CreationTimestamp.Time)
case dataselect.NamespaceProperty:
return dataselect.StdComparableString(self.ObjectMeta.Namespace)
default:
// if name is not supported then just return a constant dummy value, sort will have no effect.
return nil
}
}
func ToCells(std []batch.Job) []dataselect.DataCell {
cells := make([]dataselect.DataCell, len(std))
for i := range std {
cells[i] = JobCell(std[i])
}
return cells
}
func FromCells(cells []dataselect.DataCell) []batch.Job {
std := make([]batch.Job, len(cells))
for i := range std {
std[i] = batch.Job(cells[i].(JobCell))
}
return std
}
func getStatus(list *batch.JobList, pods []v1.Pod) common.ResourceStatus {
info := common.ResourceStatus{}
if list == nil {
return info
}
for _, job := range list.Items {
matchingPods := common.FilterPodsForJob(job, pods)
podInfo := common.GetPodInfo(job.Status.Active, job.Spec.Completions, matchingPods)
jobStatus := GetJobStatus(&job)
if jobStatus.Status == JobStatusFailed {
info.Failed++
} else if jobStatus.Status == JobStatusComplete {
info.Succeeded++
} else if podInfo.Running > 0 {
info.Running++
} else {
info.Pending++
}
}
return info
}

View File

@@ -0,0 +1,46 @@
package job
import (
"context"
batch "k8s.io/api/batch/v1"
metaV1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"pandax/apps/devops/services/k8s/common"
)
// JobDetail is a presentation layer view of Kubernetes Job resource.
type JobDetail struct {
// Extends list item structure.
Job `json:",inline"`
Status []batch.JobCondition `json:"status"`
// Completions specifies the desired number of successfully finished pods the job should be run with.
Completions *int32 `json:"completions"`
PodList *PodList `json:"podList"`
}
// GetJobDetail gets job details.
func GetJobDetail(client *kubernetes.Clientset, namespace, name string) (*JobDetail, error) {
jobData, err := client.BatchV1().Jobs(namespace).Get(context.TODO(), name, metaV1.GetOptions{})
if err != nil {
return nil, err
}
podInfo, err := getJobPodInfo(client, jobData)
if err != nil {
return nil, err
}
job := toJobDetail(client, jobData, *podInfo)
return &job, nil
}
func toJobDetail(client *kubernetes.Clientset, job *batch.Job, podInfo common.PodInfo) JobDetail {
return JobDetail{
Job: toJob(job, &podInfo),
Status: job.Status.Conditions,
Completions: job.Spec.Completions,
PodList: getJobToPod(client, job),
}
}

View File

@@ -0,0 +1,77 @@
package job
import (
"context"
"go.uber.org/zap"
batch "k8s.io/api/batch/v1"
v1 "k8s.io/api/core/v1"
metaV1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/kubernetes"
"pandax/apps/devops/entity/k8s"
k8scommon "pandax/apps/devops/services/k8s/common"
"pandax/apps/devops/services/k8s/event"
"pandax/apps/devops/services/k8s/pods"
)
type PodList struct {
ListMeta k8s.ListMeta `json:"listMeta"`
// Basic information about resources status on the list.
Status k8scommon.ResourceStatus `json:"status"`
// Unordered list of Pods.
Pods []pods.Pod `json:"pods"`
}
// Returns simple info about pods(running, desired, failing, etc.) related to given job.
func getJobPodInfo(client *kubernetes.Clientset, job *batch.Job) (*k8scommon.PodInfo, error) {
labelSelector := labels.SelectorFromSet(job.Spec.Selector.MatchLabels)
channels := &k8scommon.ResourceChannels{
PodList: k8scommon.GetPodListChannelWithOptions(client, k8scommon.NewSameNamespaceQuery(
job.Namespace),
metaV1.ListOptions{
LabelSelector: labelSelector.String(),
FieldSelector: fields.Everything().String(),
}, 1),
}
podList := <-channels.PodList.List
if err := <-channels.PodList.Error; err != nil {
return nil, err
}
podInfo := k8scommon.GetPodInfo(job.Status.Active, job.Spec.Completions, podList.Items)
// This pod info for jobs should be get from job status, similar to kubectl describe logic.
podInfo.Running = job.Status.Active
podInfo.Succeeded = job.Status.Succeeded
podInfo.Failed = job.Status.Failed
return &podInfo, nil
}
func getJobToPod(client *kubernetes.Clientset, job *batch.Job) (po *PodList) {
selector, err := metaV1.LabelSelectorAsSelector(job.Spec.Selector)
if err != nil {
return nil
}
options := metaV1.ListOptions{LabelSelector: selector.String()}
podData, err := client.CoreV1().Pods(job.Namespace).List(context.TODO(), options)
if err != nil {
global.Log.Error("Get a pod exception from the job", zap.Any("err", err))
}
podList := PodList{
Pods: make([]pods.Pod, 0),
}
podList.ListMeta = k8s.ListMeta{TotalItems: len(podData.Items)}
for _, pod := range podData.Items {
warnings := event.GetPodsEventWarnings(nil, []v1.Pod{pod})
podDetail := pods.ToPod(&pod, warnings)
podList.Pods = append(podList.Pods, podDetail)
}
return &podList
}