如何使 Kubernetes 集群自动扩容?Cluster Autoscaler 全面解析( 三 )

ScaleUP 源码解析func ScaleUp(context *context.AutoscalingContext, processors *ca_processors.AutoscalingProcessors, clusterStateRegistry *clusterstate.ClusterStateRegistry, unschedulablePods []*apiv1.Pod, nodes []*apiv1.Node, daemonSets []*Appsv1.DaemonSet, nodeInfos map[string]*schedulernodeinfo.NodeInfo, ignoredTaints taints.TaintKeySet) (*status.ScaleUpStatus, errors.AutoscalerError) {...... // 验证当前集群中所有 ready node 是否来自于 nodeGroups,取得所有非组内的 node nodesFromNotAutoscaledGroups, err := utils.FilterOutNodesFromNotAutoscaledGroups(nodes, context.CloudProvider) if err != nil {return &status.ScaleUpStatus{Result: status.ScaleUpError}, err.AddPrefix("failed to filter out nodes which are from not autoscaled groups: ") } nodeGroups := context.CloudProvider.NodeGroups() gpuLabel := context.CloudProvider.GPULabel() availableGPUTypes := context.CloudProvider.GetAvailableGPUTypes() // 资源限制对象,会在 build cloud provider 时传入 // 如果有需要可在 CloudProvider 中自行更改,但不建议改动,会对用户造成迷惑 resourceLimiter, errCP := context.CloudProvider.GetResourceLimiter() if errCP != nil {return &status.ScaleUpStatus{Result: status.ScaleUpError}, errors.ToAutoscalerError(errors.CloudProviderError,errCP) } // 计算资源限制 // nodeInfos 是所有拥有节点组的节点与示例节点的映射 // 示例节点会优先考虑真实节点的数据,如果 NodeGroup 中还没有真实节点的部署,则使用 Template 的节点数据 scaleUpResourcesLeft, errLimits := computeScaleUpResourcesLeftLimits(context.CloudProvider, nodeGroups, nodeInfos, nodesFromNotAutoscaledGroups, resourceLimiter) if errLimits != nil {return &status.ScaleUpStatus{Result: status.ScaleUpError}, errLimits.AddPrefix("Could not compute total resources: ") } // 根据当前节点与 NodeGroups 中的节点来计算会有多少节点即将加入集群中 // 由于云服务商的伸缩组 increase size 操作并不是同步加入 node,所以将其统计,以便于后面计算节点资源 upcomingNodes := make([]*schedulernodeinfo.NodeInfo, 0) for nodeGroup, numberOfNodes := range clusterStateRegistry.GetUpcomingNodes() {...... } klog.V(4).Infof("Upcoming %d nodes", len(upcomingNodes)) // 最终会进入选择的节点组 expansionOptions := make(map[string]expander.Option, 0) ...... // 出于某些限制或错误导致不能加入新节点的节点组,例如节点组已达到 MaxSize skippedNodeGroups := map[string]status.Reasons{} // 综合各种情况,筛选出节点组 for _, nodeGroup := range nodeGroups { ...... } if len(expansionOptions) == 0 {klog.V(1).Info("No expansion options")return &status.ScaleUpStatus{Result:status.ScaleUpNoOptionsAvailable,PodsRemainUnschedulable: getRemainingPods(podEquivalenceGroups, skippedNodeGroups),ConsideredNodeGroups: nodeGroups,}, nil } ...... // 选择一个最佳的节点组进行扩容,expander 用于选择一个合适的节点组进行扩容,默认为 RandomExpander,flag: expander // random 随机选一个,适合只有一个节点组 // most-pods 选择能够调度最多 pod 的节点组,比如有 noSchedulerPods 是有 nodeSelector 的,它会优先选择此类节点组以满足大多数 pod 的需求 // least-waste 优先选择能满足 pod 需求资源的最小资源类型的节点组 // price 根据价格模型,选择最省钱的 // priority 根据优先级选择 bestOption := context.ExpanderStrategy.BestOption(options, nodeInfos) if bestOption != nil && bestOption.NodeCount > 0 { ......newNodes := bestOption.NodeCount// 考虑到 upcomingNodes, 重新计算本次新加入节点if context.MaxNodesTotal > 0 && len(nodes)+newNodes+len(upcomingNodes) > context.MaxNodesTotal {klog.V(1).Infof("Capping size to max cluster total size (%d)", context.MaxNodesTotal)newNodes = context.MaxNodesTotal - len(nodes) - len(upcomingNodes)if newNodes < 1 {return &status.ScaleUpStatus{Result: status.ScaleUpError}, errors.NewAutoscalerError(errors.TransientError,"max node total count already reached")}}createNodeGroupResults := make([]nodegroups.CreateNodeGroupResult, 0)// 如果节点组在云服务商端处不存在,会尝试创建根据现有信息重新创建一个云端节点组// 但是目前所有的 CloudProvider 实现都没有允许这种操作,这好像是个多余的方法// 云服务商不想,也不应该将云端节点组的创建权限交给 ClusterAutoscalerif !bestOption.NodeGroup.Exist() {oldId := bestOption.NodeGroup.Id()createNodeGroupResult, err := processors.NodeGroupManager.CreateNodeGroup(context, bestOption.NodeGroup)......}// 得到最佳节点组的示例节点nodeInfo, found := nodeInfos[bestOption.NodeGroup.Id()]if !found {// This should never happen, as we already should have retrieved// nodeInfo for any considered nodegroup.klog.Errorf("No node info for: %s", bestOption.NodeGroup.Id())return &status.ScaleUpStatus{Result: status.ScaleUpError, CreateNodeGroupResults: createNodeGroupResults}, errors.NewAutoscalerError(errors.CloudProviderError,"No node info for best expansion option!")}// 根据 CPU、Memory及可能存在的 GPU 资源(hack: we assume anything which is not cpu/memory to be a gpu.),计算出需要多少个 NodesnewNodes, err = applyScaleUpResourcesLimits(context.CloudProvider, newNodes, scaleUpResourcesLeft, nodeInfo, bestOption.NodeGroup, resourceLimiter)if err != nil {return &status.ScaleUpStatus{Result: status.ScaleUpError, CreateNodeGroupResults: createNodeGroupResults}, err}// 需要平衡的节点组targetNodeGroups := []cloudprovider.NodeGroup{bestOption.NodeGroup}// 如果需要平衡节点组,根据 balance-similar-node-groups flag 设置 。// 检测相似的节点组,并平衡它们之间的节点数量if context.BalanceSimilarNodeGroups {......}// 具体平衡策略可以看 (b *BalancingNodeGroupSetProcessor) BalanceScaleUpBetweenGroups 方法scaleUpInfos, typedErr := processors.NodeGroupSetProcessor.BalanceScaleUpBetweenGroups(context, targetNodeGroups, newNodes)if typedErr != nil {return &status.ScaleUpStatus{Result: status.ScaleUpError, CreateNodeGroupResults: createNodeGroupResults}, typedErr}klog.V(1).Infof("Final scale-up plan: %v", scaleUpInfos)// 开始扩容,通过 IncreaseSize 扩容for _, info := range scaleUpInfos {typedErr := executeScaleUp(context, clusterStateRegistry, info, gpu.GetGpuTypeForMetrics(gpuLabel, availableGPUTypes, nodeInfo.Node(), nil), now)if typedErr != nil {return &status.ScaleUpStatus{Result: status.ScaleUpError, CreateNodeGroupResults: createNodeGroupResults}, typedErr}}...... } ......}


推荐阅读