From 0294b2ad69e43e76921e2760df4e8dd590a1695b Mon Sep 17 00:00:00 2001 From: HR Wu <5631010+heiruwu@users.noreply.github.com> Date: Thu, 13 Apr 2023 11:48:43 +0800 Subject: [PATCH 1/3] fix: probe requests block thread (#13) Because - avoid probing request block subsequent request This commit - add nested threading in probing --- cmd/main/main.go | 51 +++++++---- pkg/service/connector.go | 177 ++++++++++++++++++++++++--------------- pkg/service/model.go | 48 +++++++---- pkg/service/pipeline.go | 174 ++++++++++++++++++++------------------ pkg/service/service.go | 122 +++++++++++++++------------ 5 files changed, 335 insertions(+), 237 deletions(-) diff --git a/cmd/main/main.go b/cmd/main/main.go index 2c74623..50ae6fb 100644 --- a/cmd/main/main.go +++ b/cmd/main/main.go @@ -9,6 +9,7 @@ import ( "os/signal" "regexp" "strings" + "sync" "syscall" "time" @@ -207,32 +208,52 @@ func main() { go func() { logger.Info("[controller] control loop started") + var mainWG sync.WaitGroup for { logger.Info("[Controller] --------------Start probing------------") + + mainWG.Add(5) + // Backend services - if err := service.ProbeBackend(context.WithTimeout(ctx, config.Config.Server.Timeout*time.Second)); err != nil { - logger.Error(err.Error()) - } + go func() { + defer mainWG.Done() + if err := service.ProbeBackend(context.WithTimeout(ctx, config.Config.Server.Timeout*time.Second)); err != nil { + logger.Error(err.Error()) + } + }() // Models - if err := service.ProbeModels(context.WithTimeout(ctx, config.Config.Server.Timeout*time.Second)); err != nil { - logger.Error(err.Error()) - } + go func() { + defer mainWG.Done() + if err := service.ProbeModels(context.WithTimeout(ctx, config.Config.Server.Timeout*time.Second)); err != nil { + logger.Error(err.Error()) + } + }() // Connectors - if err := service.ProbeSourceConnectors(context.WithTimeout(ctx, config.Config.Server.Timeout*time.Second)); err != nil { - logger.Error(err.Error()) - } - if err := service.ProbeDestinationConnectors(context.WithTimeout(ctx, config.Config.Server.Timeout*time.Second)); err != nil { - logger.Error(err.Error()) - } + go func() { + defer mainWG.Done() + if err := service.ProbeSourceConnectors(context.WithTimeout(ctx, config.Config.Server.Timeout*time.Second)); err != nil { + logger.Error(err.Error()) + } + }() + go func() { + defer mainWG.Done() + if err := service.ProbeDestinationConnectors(context.WithTimeout(ctx, config.Config.Server.Timeout*time.Second)); err != nil { + logger.Error(err.Error()) + } + }() // Pipelines - if err := service.ProbePipelines(context.WithTimeout(ctx, config.Config.Server.Timeout*time.Second)); err != nil { - logger.Error(err.Error()) - } + go func() { + defer mainWG.Done() + if err := service.ProbePipelines(context.WithTimeout(ctx, config.Config.Server.Timeout*time.Second)); err != nil { + logger.Error(err.Error()) + } + }() time.Sleep(config.Config.Server.LoopInterval * time.Second) + mainWG.Wait() } }() diff --git a/pkg/service/connector.go b/pkg/service/connector.go index 96fa36c..1e97708 100644 --- a/pkg/service/connector.go +++ b/pkg/service/connector.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "strconv" + "sync" "cloud.google.com/go/longrunning/autogen/longrunningpb" "github.com/instill-ai/controller/internal/logger" @@ -16,6 +17,10 @@ import ( func (s *service) ProbeSourceConnectors(ctx context.Context, cancel context.CancelFunc) error { defer cancel() + logger, _ := logger.GetZapLogger() + + var wg sync.WaitGroup + resp, err := s.connectorPublicClient.ListSourceConnectors(ctx, &connectorPB.ListSourceConnectorsRequest{}) if err != nil { @@ -26,6 +31,8 @@ func (s *service) ProbeSourceConnectors(ctx context.Context, cancel context.Canc nextPageToken := &resp.NextPageToken totalSize := resp.TotalSize + wg.Add(int(totalSize)) + for totalSize > util.DefaultPageSize { resp, err := s.connectorPublicClient.ListSourceConnectors(ctx, &connectorPB.ListSourceConnectorsRequest{ PageToken: nextPageToken, @@ -41,50 +48,67 @@ func (s *service) ProbeSourceConnectors(ctx context.Context, cancel context.Canc } for _, connector := range connectors { - resourceName := util.ConvertRequestToResourceName(connector.Name) - - // if user desires disconnected - if connector.Connector.State == connectorPB.Connector_STATE_DISCONNECTED { - if err := s.UpdateResourceState(ctx, &controllerPB.Resource{ - Name: resourceName, - State: &controllerPB.Resource_ConnectorState{ - ConnectorState: connectorPB.Connector_STATE_DISCONNECTED, - }, - }); err != nil { - return err - } - } - // if user desires connected - workflowId, _ := s.GetResourceWorkflowId(ctx, resourceName) - // check if there is an ongoing workflow - if workflowId != nil { - opInfo, err := s.getOperationInfo(*workflowId, util.RESOURCE_TYPE_SOURCE_CONNECTOR) - if err != nil { - return err - } - if err := s.updateRunningConnector(ctx, resourceName, *opInfo); err != nil { - return err - } - // if not trigger connector check workflow - } else { - resp, err := s.connectorPrivateClient.CheckSourceConnector(ctx, &connectorPB.CheckSourceConnectorRequest{ - Name: connector.Name, - }) - if err != nil { - return err + + go func(connector *connectorPB.SourceConnector) { + defer wg.Done() + + resourceName := util.ConvertRequestToResourceName(connector.Name) + + // if user desires disconnected + if connector.Connector.State == connectorPB.Connector_STATE_DISCONNECTED { + if err := s.UpdateResourceState(ctx, &controllerPB.Resource{ + Name: resourceName, + State: &controllerPB.Resource_ConnectorState{ + ConnectorState: connectorPB.Connector_STATE_DISCONNECTED, + }, + }); err != nil { + logger.Error(err.Error()) + return + } } - // non grpc/http connector, save workflowid - if err := s.updateStaleConnector(ctx, resourceName, resp.WorkflowId); err != nil { - return err + // if user desires connected + workflowId, _ := s.GetResourceWorkflowId(ctx, resourceName) + // check if there is an ongoing workflow + if workflowId != nil { + opInfo, err := s.getOperationInfo(*workflowId, util.RESOURCE_TYPE_SOURCE_CONNECTOR) + if err != nil { + logger.Error(err.Error()) + return + } + if err := s.updateRunningConnector(ctx, resourceName, *opInfo); err != nil { + logger.Error(err.Error()) + return + } + // if not trigger connector check workflow + } else { + resp, err := s.connectorPrivateClient.CheckSourceConnector(ctx, &connectorPB.CheckSourceConnectorRequest{ + Name: connector.Name, + }) + if err != nil { + logger.Error(err.Error()) + return + } + // non grpc/http connector, save workflowid + if err := s.updateStaleConnector(ctx, resourceName, resp.WorkflowId); err != nil { + logger.Error(err.Error()) + return + } } - } + }(connector) } + + wg.Wait() + return nil } func (s *service) ProbeDestinationConnectors(ctx context.Context, cancel context.CancelFunc) error { defer cancel() + logger, _ := logger.GetZapLogger() + + var wg sync.WaitGroup + resp, err := s.connectorPublicClient.ListDestinationConnectors(ctx, &connectorPB.ListDestinationConnectorsRequest{}) if err != nil { @@ -109,44 +133,59 @@ func (s *service) ProbeDestinationConnectors(ctx context.Context, cancel context connectors = append(connectors, resp.DestinationConnectors...) } + wg.Add(len(connectors)) + for _, connector := range connectors { - resourceName := util.ConvertRequestToResourceName(connector.Name) - - // if user desires disconnected - if connector.Connector.State == connectorPB.Connector_STATE_DISCONNECTED { - if err := s.UpdateResourceState(ctx, &controllerPB.Resource{ - Name: resourceName, - State: &controllerPB.Resource_ConnectorState{ - ConnectorState: connectorPB.Connector_STATE_DISCONNECTED, - }, - }); err != nil { - return err - } - } - // if user desires connected - workflowId, _ := s.GetResourceWorkflowId(ctx, resourceName) - // check if there is an ongoing workflow - if workflowId != nil { - opInfo, err := s.getOperationInfo(*workflowId, util.RESOURCE_TYPE_DESTINATION_CONNECTOR) - if err != nil { - return err - } - if err := s.updateRunningConnector(ctx, resourceName, *opInfo); err != nil { - return err - } - // if not trigger connector check workflow - } else { - resp, err := s.connectorPrivateClient.CheckDestinationConnector(ctx, &connectorPB.CheckDestinationConnectorRequest{ - Name: connector.Name, - }) - if err != nil { - return err + + go func(connector *connectorPB.DestinationConnector) { + defer wg.Done() + + resourceName := util.ConvertRequestToResourceName(connector.Name) + + // if user desires disconnected + if connector.Connector.State == connectorPB.Connector_STATE_DISCONNECTED { + if err := s.UpdateResourceState(ctx, &controllerPB.Resource{ + Name: resourceName, + State: &controllerPB.Resource_ConnectorState{ + ConnectorState: connectorPB.Connector_STATE_DISCONNECTED, + }, + }); err != nil { + logger.Error(err.Error()) + return + } } - if err := s.updateStaleConnector(ctx, resourceName, resp.WorkflowId); err != nil { - return err + // if user desires connected + workflowId, _ := s.GetResourceWorkflowId(ctx, resourceName) + // check if there is an ongoing workflow + if workflowId != nil { + opInfo, err := s.getOperationInfo(*workflowId, util.RESOURCE_TYPE_DESTINATION_CONNECTOR) + if err != nil { + logger.Error(err.Error()) + return + } + if err := s.updateRunningConnector(ctx, resourceName, *opInfo); err != nil { + logger.Error(err.Error()) + return + } + // if not trigger connector check workflow + } else { + resp, err := s.connectorPrivateClient.CheckDestinationConnector(ctx, &connectorPB.CheckDestinationConnectorRequest{ + Name: connector.Name, + }) + if err != nil { + logger.Error(err.Error()) + return + } + if err := s.updateStaleConnector(ctx, resourceName, resp.WorkflowId); err != nil { + logger.Error(err.Error()) + return + } } - } + }(connector) } + + wg.Wait() + return nil } diff --git a/pkg/service/model.go b/pkg/service/model.go index 3e1699b..cf91838 100644 --- a/pkg/service/model.go +++ b/pkg/service/model.go @@ -3,6 +3,7 @@ package service import ( "context" "fmt" + "sync" "github.com/instill-ai/controller/internal/logger" "github.com/instill-ai/controller/internal/util" @@ -16,6 +17,8 @@ func (s *service) ProbeModels(ctx context.Context, cancel context.CancelFunc) er logger, _ := logger.GetZapLogger() + var wg sync.WaitGroup + resp, err := s.modelPublicClient.ListModels(ctx, &modelPB.ListModelsRequest{}) if err != nil { @@ -40,28 +43,37 @@ func (s *service) ProbeModels(ctx context.Context, cancel context.CancelFunc) er models = append(models, resp.Models...) } + wg.Add(len(models)) + for _, model := range models { - resp, err := s.modelPrivateClient.CheckModel(ctx, &modelPB.CheckModelRequest{ - Name: model.Name, - }) - if err != nil { - return err - } + go func(model *modelPB.Model) { + defer wg.Done() + + if resp, err := s.modelPrivateClient.CheckModel(ctx, &modelPB.CheckModelRequest{ + Name: model.Name, + }); err == nil { + if err = s.UpdateResourceState(ctx, &controllerPB.Resource{ + Name: util.ConvertRequestToResourceName(model.Name), + State: &controllerPB.Resource_ModelState{ + ModelState: resp.State, + }, + }); err != nil { + logger.Error(err.Error()) + return + } + } else { + logger.Error(err.Error()) + return + } + + logResp, _ := s.GetResourceState(ctx, util.ConvertRequestToResourceName(model.Name)) + logger.Info(fmt.Sprintf("[Controller] Got %v", logResp)) + }(model) - err = s.UpdateResourceState(ctx, &controllerPB.Resource{ - Name: util.ConvertRequestToResourceName(model.Name), - State: &controllerPB.Resource_ModelState{ - ModelState: resp.State, - }, - }) + } - if err != nil { - return err - } + wg.Wait() - logResp, _ := s.GetResourceState(ctx, util.ConvertRequestToResourceName(model.Name)) - logger.Info(fmt.Sprintf("[Controller] Got %v", logResp)) - } return nil } diff --git a/pkg/service/pipeline.go b/pkg/service/pipeline.go index aafc553..ec2689f 100644 --- a/pkg/service/pipeline.go +++ b/pkg/service/pipeline.go @@ -3,6 +3,7 @@ package service import ( "context" "fmt" + "sync" "github.com/instill-ai/controller/internal/logger" "github.com/instill-ai/controller/internal/util" @@ -18,6 +19,8 @@ func (s *service) ProbePipelines(ctx context.Context, cancel context.CancelFunc) logger, _ := logger.GetZapLogger() + var wg sync.WaitGroup + resp, err := s.pipelinePrivateClient.ListPipelinesAdmin(ctx, &pipelinePB.ListPipelinesAdminRequest{ View: pipelinePB.View_VIEW_FULL.Enum(), }) @@ -45,106 +48,117 @@ func (s *service) ProbePipelines(ctx context.Context, cancel context.CancelFunc) pipelines = append(pipelines, resp.Pipelines...) } + wg.Add(len(pipelines)) + for _, pipeline := range pipelines { - resourceName := util.ConvertRequestToResourceName(pipeline.Name) - pipelineResource := controllerPB.Resource{ - Name: resourceName, - State: &controllerPB.Resource_PipelineState{ - PipelineState: pipelinePB.Pipeline_STATE_INACTIVE, - }, - } + go func(pipeline *pipelinePB.Pipeline) { + defer wg.Done() - // user desires inactive - if pipeline.State == pipelinePB.Pipeline_STATE_INACTIVE { - if err := s.UpdateResourceState(ctx, &pipelineResource); err != nil { - return err - } else { - return nil + resourceName := util.ConvertRequestToResourceName(pipeline.Name) + + pipelineResource := controllerPB.Resource{ + Name: resourceName, + State: &controllerPB.Resource_PipelineState{ + PipelineState: pipelinePB.Pipeline_STATE_INACTIVE, + }, } - } - // user desires active, now check each component's state - pipelineResource.State = &controllerPB.Resource_PipelineState{PipelineState: pipelinePB.Pipeline_STATE_ERROR} + // user desires inactive + if pipeline.State == pipelinePB.Pipeline_STATE_INACTIVE { + if err := s.UpdateResourceState(ctx, &pipelineResource); err != nil { + logger.Error(err.Error()) + return + } else { + return + } + } - var resources []*controllerPB.Resource + // user desires active, now check each component's state + pipelineResource.State = &controllerPB.Resource_PipelineState{PipelineState: pipelinePB.Pipeline_STATE_ERROR} - sourceConnectorResource, err := s.GetResourceState(ctx, util.ConvertRequestToResourceName(pipeline.Recipe.Source)) - if err != nil { - s.UpdateResourceState(ctx, &pipelineResource) - logger.Error("no record find for source connector") - return err - } - resources = append(resources, sourceConnectorResource) + var resources []*controllerPB.Resource - destinationConnectorResource, err := s.GetResourceState(ctx, util.ConvertRequestToResourceName(pipeline.Recipe.Destination)) - if err != nil { - s.UpdateResourceState(ctx, &pipelineResource) - logger.Error("no record find for destination connector") - return err - } - resources = append(resources, destinationConnectorResource) + sourceConnectorResource, err := s.GetResourceState(ctx, util.ConvertRequestToResourceName(pipeline.Recipe.Source)) + if err != nil { + s.UpdateResourceState(ctx, &pipelineResource) + logger.Error("no record found for source connector") + return + } + resources = append(resources, sourceConnectorResource) - modelNames := pipeline.Recipe.Models - for _, modelName := range modelNames { - modelResource, err := s.GetResourceState(ctx, util.ConvertRequestToResourceName(modelName)) + destinationConnectorResource, err := s.GetResourceState(ctx, util.ConvertRequestToResourceName(pipeline.Recipe.Destination)) if err != nil { s.UpdateResourceState(ctx, &pipelineResource) - logger.Error(fmt.Sprintf("no record find for model %v", modelName)) - return err + logger.Error("no record found for destination connector") + return } + resources = append(resources, destinationConnectorResource) + + modelNames := pipeline.Recipe.Models + for _, modelName := range modelNames { + modelResource, err := s.GetResourceState(ctx, util.ConvertRequestToResourceName(modelName)) + if err != nil { + s.UpdateResourceState(ctx, &pipelineResource) + logger.Error(fmt.Sprintf("no record found for model %v", modelName)) + return + } - resources = append(resources, modelResource) - } + resources = append(resources, modelResource) + } - for _, r := range resources { - switch v := r.State.(type) { - case *controllerPB.Resource_ConnectorState: - switch v.ConnectorState { - case connectorPB.Connector_STATE_DISCONNECTED: - pipelineResource.State = &controllerPB.Resource_PipelineState{ - PipelineState: pipelinePB.Pipeline_STATE_INACTIVE, - } - case connectorPB.Connector_STATE_UNSPECIFIED: - pipelineResource.State = &controllerPB.Resource_PipelineState{ - PipelineState: pipelinePB.Pipeline_STATE_UNSPECIFIED, - } - case connectorPB.Connector_STATE_ERROR: - pipelineResource.State = &controllerPB.Resource_PipelineState{ - PipelineState: pipelinePB.Pipeline_STATE_ERROR, - } - default: - continue - } - case *controllerPB.Resource_ModelState: - switch v.ModelState { - case modelPB.Model_STATE_OFFLINE: - pipelineResource.State = &controllerPB.Resource_PipelineState{ - PipelineState: pipelinePB.Pipeline_STATE_INACTIVE, - } - case modelPB.Model_STATE_UNSPECIFIED: - pipelineResource.State = &controllerPB.Resource_PipelineState{ - PipelineState: pipelinePB.Pipeline_STATE_UNSPECIFIED, + for _, r := range resources { + switch v := r.State.(type) { + case *controllerPB.Resource_ConnectorState: + switch v.ConnectorState { + case connectorPB.Connector_STATE_DISCONNECTED: + pipelineResource.State = &controllerPB.Resource_PipelineState{ + PipelineState: pipelinePB.Pipeline_STATE_INACTIVE, + } + case connectorPB.Connector_STATE_UNSPECIFIED: + pipelineResource.State = &controllerPB.Resource_PipelineState{ + PipelineState: pipelinePB.Pipeline_STATE_UNSPECIFIED, + } + case connectorPB.Connector_STATE_ERROR: + pipelineResource.State = &controllerPB.Resource_PipelineState{ + PipelineState: pipelinePB.Pipeline_STATE_ERROR, + } + default: + continue } - case modelPB.Model_STATE_ERROR: - pipelineResource.State = &controllerPB.Resource_PipelineState{ - PipelineState: pipelinePB.Pipeline_STATE_ERROR, + case *controllerPB.Resource_ModelState: + switch v.ModelState { + case modelPB.Model_STATE_OFFLINE: + pipelineResource.State = &controllerPB.Resource_PipelineState{ + PipelineState: pipelinePB.Pipeline_STATE_INACTIVE, + } + case modelPB.Model_STATE_UNSPECIFIED: + pipelineResource.State = &controllerPB.Resource_PipelineState{ + PipelineState: pipelinePB.Pipeline_STATE_UNSPECIFIED, + } + case modelPB.Model_STATE_ERROR: + pipelineResource.State = &controllerPB.Resource_PipelineState{ + PipelineState: pipelinePB.Pipeline_STATE_ERROR, + } + default: + continue } - default: - continue } + s.UpdateResourceState(ctx, &pipelineResource) + return } - s.UpdateResourceState(ctx, &pipelineResource) - return nil - } - pipelineResource.State = &controllerPB.Resource_PipelineState{ - PipelineState: pipelinePB.Pipeline_STATE_ACTIVE, - } - s.UpdateResourceState(ctx, &pipelineResource) + pipelineResource.State = &controllerPB.Resource_PipelineState{ + PipelineState: pipelinePB.Pipeline_STATE_ACTIVE, + } + s.UpdateResourceState(ctx, &pipelineResource) - logResp, _ := s.GetResourceState(ctx, resourceName) - logger.Info(fmt.Sprintf("[Controller] Got %v", logResp)) + logResp, _ := s.GetResourceState(ctx, resourceName) + logger.Info(fmt.Sprintf("[Controller] Got %v", logResp)) + }(pipeline) } + + wg.Wait() + return nil } diff --git a/pkg/service/service.go b/pkg/service/service.go index 500c566..1943fa0 100644 --- a/pkg/service/service.go +++ b/pkg/service/service.go @@ -5,6 +5,7 @@ import ( "fmt" "strconv" "strings" + "sync" "time" "cloud.google.com/go/longrunning/autogen/longrunningpb" @@ -247,6 +248,8 @@ func (s *service) ProbeBackend(ctx context.Context, cancel context.CancelFunc) e logger, _ := logger.GetZapLogger() + var wg sync.WaitGroup + healthcheck := healthcheckPB.HealthCheckResponse{ Status: healthcheckPB.HealthCheckResponse_SERVING_STATUS_UNSPECIFIED, } @@ -259,84 +262,93 @@ func (s *service) ProbeBackend(ctx context.Context, cancel context.CancelFunc) e config.Config.MgmtBackend.Host, } + wg.Add(len(backenServices)) + for _, hostname := range backenServices { - switch hostname { - case config.Config.TritonServer.Host: - resp, err := s.tritonClient.ServerLive(ctx, &inferenceserver.ServerLiveRequest{}) + go func(hostname string) { + defer wg.Done() - if err != nil { - healthcheck = healthcheckPB.HealthCheckResponse{ - Status: healthcheckPB.HealthCheckResponse_SERVING_STATUS_NOT_SERVING, - } - } else { - if resp.GetLive() { + switch hostname { + case config.Config.TritonServer.Host: + resp, err := s.tritonClient.ServerLive(ctx, &inferenceserver.ServerLiveRequest{}) + + if err != nil { healthcheck = healthcheckPB.HealthCheckResponse{ - Status: healthcheckPB.HealthCheckResponse_SERVING_STATUS_SERVING, + Status: healthcheckPB.HealthCheckResponse_SERVING_STATUS_NOT_SERVING, } } else { + if resp.GetLive() { + healthcheck = healthcheckPB.HealthCheckResponse{ + Status: healthcheckPB.HealthCheckResponse_SERVING_STATUS_SERVING, + } + } else { + healthcheck = healthcheckPB.HealthCheckResponse{ + Status: healthcheckPB.HealthCheckResponse_SERVING_STATUS_NOT_SERVING, + } + } + } + case config.Config.ModelBackend.Host: + resp, err := s.modelPublicClient.Liveness(ctx, &modelPB.LivenessRequest{}) + + if err != nil { healthcheck = healthcheckPB.HealthCheckResponse{ Status: healthcheckPB.HealthCheckResponse_SERVING_STATUS_NOT_SERVING, } + } else { + healthcheck = *resp.GetHealthCheckResponse() } - } - case config.Config.ModelBackend.Host: - resp, err := s.modelPublicClient.Liveness(ctx, &modelPB.LivenessRequest{}) + case config.Config.PipelineBackend.Host: + resp, err := s.pipelinePublicClient.Liveness(ctx, &pipelinePB.LivenessRequest{}) - if err != nil { - healthcheck = healthcheckPB.HealthCheckResponse{ - Status: healthcheckPB.HealthCheckResponse_SERVING_STATUS_NOT_SERVING, + if err != nil { + healthcheck = healthcheckPB.HealthCheckResponse{ + Status: healthcheckPB.HealthCheckResponse_SERVING_STATUS_NOT_SERVING, + } + } else { + healthcheck = *resp.GetHealthCheckResponse() } - } else { - healthcheck = *resp.GetHealthCheckResponse() - } - case config.Config.PipelineBackend.Host: - resp, err := s.pipelinePublicClient.Liveness(ctx, &pipelinePB.LivenessRequest{}) + case config.Config.MgmtBackend.Host: + resp, err := s.mgmtPublicClient.Liveness(ctx, &mgmtPB.LivenessRequest{}) - if err != nil { - healthcheck = healthcheckPB.HealthCheckResponse{ - Status: healthcheckPB.HealthCheckResponse_SERVING_STATUS_NOT_SERVING, + if err != nil { + healthcheck = healthcheckPB.HealthCheckResponse{ + Status: healthcheckPB.HealthCheckResponse_SERVING_STATUS_NOT_SERVING, + } + } else { + healthcheck = *resp.GetHealthCheckResponse() } - } else { - healthcheck = *resp.GetHealthCheckResponse() - } - case config.Config.MgmtBackend.Host: - resp, err := s.mgmtPublicClient.Liveness(ctx, &mgmtPB.LivenessRequest{}) + case config.Config.ConnectorBackend.Host: + resp, err := s.connectorPublicClient.Liveness(ctx, &connectorPB.LivenessRequest{}) - if err != nil { - healthcheck = healthcheckPB.HealthCheckResponse{ - Status: healthcheckPB.HealthCheckResponse_SERVING_STATUS_NOT_SERVING, + if err != nil { + healthcheck = healthcheckPB.HealthCheckResponse{ + Status: healthcheckPB.HealthCheckResponse_SERVING_STATUS_NOT_SERVING, + } + } else { + healthcheck = *resp.GetHealthCheckResponse() } - } else { - healthcheck = *resp.GetHealthCheckResponse() } - case config.Config.ConnectorBackend.Host: - resp, err := s.connectorPublicClient.Liveness(ctx, &connectorPB.LivenessRequest{}) + + err := s.UpdateResourceState(ctx, &controllerPB.Resource{ + Name: util.ConvertServiceToResourceName(hostname), + State: &controllerPB.Resource_BackendState{ + BackendState: healthcheck.Status, + }, + }) if err != nil { - healthcheck = healthcheckPB.HealthCheckResponse{ - Status: healthcheckPB.HealthCheckResponse_SERVING_STATUS_NOT_SERVING, - } - } else { - healthcheck = *resp.GetHealthCheckResponse() + logger.Error(err.Error()) + return } - } - err := s.UpdateResourceState(ctx, &controllerPB.Resource{ - Name: util.ConvertServiceToResourceName(hostname), - State: &controllerPB.Resource_BackendState{ - BackendState: healthcheck.Status, - }, - }) + resp, _ := s.GetResourceState(ctx, util.ConvertServiceToResourceName(hostname)) - if err != nil { - return err - } - - resp, _ := s.GetResourceState(ctx, util.ConvertServiceToResourceName(hostname)) - - logger.Info(fmt.Sprintf("[Controller] Got %v", resp)) + logger.Info(fmt.Sprintf("[Controller] Got %v", resp)) + }(hostname) } + wg.Wait() + return nil } From a086a554c917f13e0c4c9bad5b0bcdeed31a18f5 Mon Sep 17 00:00:00 2001 From: HR Wu <5631010+heiruwu@users.noreply.github.com> Date: Sat, 15 Apr 2023 01:42:37 +0800 Subject: [PATCH 2/3] chore(connector): adopt removal of search attributes (#16) Because - remove search attributes in connector-backend This commit - update connector probing logic to passively wait for connector-backend to update the state when check workflow is done --- pkg/service/connector.go | 54 +++++++++++----------------------------- pkg/service/service.go | 50 +++++++++++++------------------------ 2 files changed, 31 insertions(+), 73 deletions(-) diff --git a/pkg/service/connector.go b/pkg/service/connector.go index 1e97708..4fc35eb 100644 --- a/pkg/service/connector.go +++ b/pkg/service/connector.go @@ -3,10 +3,8 @@ package service import ( "context" "fmt" - "strconv" "sync" - "cloud.google.com/go/longrunning/autogen/longrunningpb" "github.com/instill-ai/controller/internal/logger" "github.com/instill-ai/controller/internal/util" @@ -75,9 +73,11 @@ func (s *service) ProbeSourceConnectors(ctx context.Context, cancel context.Canc logger.Error(err.Error()) return } - if err := s.updateRunningConnector(ctx, resourceName, *opInfo); err != nil { - logger.Error(err.Error()) - return + if opInfo.Done { + if err := s.DeleteResourceWorkflowId(ctx, resourceName); err != nil { + logger.Error(err.Error()) + return + } } // if not trigger connector check workflow } else { @@ -94,6 +94,8 @@ func (s *service) ProbeSourceConnectors(ctx context.Context, cancel context.Canc return } } + logResp, _ := s.GetResourceState(ctx, resourceName) + logger.Info(fmt.Sprintf("[Controller] Got %v", logResp)) }(connector) } @@ -163,9 +165,11 @@ func (s *service) ProbeDestinationConnectors(ctx context.Context, cancel context logger.Error(err.Error()) return } - if err := s.updateRunningConnector(ctx, resourceName, *opInfo); err != nil { - logger.Error(err.Error()) - return + if opInfo.Done { + if err := s.DeleteResourceWorkflowId(ctx, resourceName); err != nil { + logger.Error(err.Error()) + return + } } // if not trigger connector check workflow } else { @@ -181,6 +185,8 @@ func (s *service) ProbeDestinationConnectors(ctx context.Context, cancel context return } } + logResp, _ := s.GetResourceState(ctx, resourceName) + logger.Info(fmt.Sprintf("[Controller] Got %v", logResp)) }(connector) } @@ -189,36 +195,7 @@ func (s *service) ProbeDestinationConnectors(ctx context.Context, cancel context return nil } -func (s *service) updateRunningConnector(ctx context.Context, resourceName string, opInfo longrunningpb.Operation) error { - logger, _ := logger.GetZapLogger() - - // if workflow done get result, otherwise remains same state - if opInfo.Done { - stateInt, err := strconv.ParseInt(string(opInfo.GetResponse().Value[:]), 10, 32) - if err != nil { - return err - } - if err := s.UpdateResourceState(ctx, &controllerPB.Resource{ - Name: resourceName, - State: &controllerPB.Resource_ConnectorState{ - ConnectorState: connectorPB.Connector_State(stateInt), - }, - }); err != nil { - return err - } - if err := s.DeleteResourceWorkflowId(ctx, resourceName); err != nil { - return err - } - } - - logResp, _ := s.GetResourceState(ctx, resourceName) - logger.Info(fmt.Sprintf("[Controller] Got %v", logResp)) - - return nil -} - func (s *service) updateStaleConnector(ctx context.Context, resourceName string, workflowId string) error { - logger, _ := logger.GetZapLogger() // non grpc/http connector, save workflowid if workflowId != "" { if err := s.UpdateResourceWorkflowId(ctx, resourceName, workflowId); err != nil { @@ -235,8 +212,5 @@ func (s *service) updateStaleConnector(ctx context.Context, resourceName string, } } - logResp, _ := s.GetResourceState(ctx, resourceName) - logger.Info(fmt.Sprintf("[Controller] Got %v", logResp)) - return nil } diff --git a/pkg/service/service.go b/pkg/service/service.go index 1943fa0..a7849c9 100644 --- a/pkg/service/service.go +++ b/pkg/service/service.go @@ -137,6 +137,23 @@ func (s *service) UpdateResourceState(ctx context.Context, resource *controllerP switch resourceType { case util.RESOURCE_TYPE_MODEL: state = int(resource.GetModelState()) + if workflowId != nil { + if len(*workflowId) > 1 { + if opInfo, err := s.getOperationInfo(*workflowId, resourceType); err != nil { + return err + } else { + if opInfo != nil { + if !opInfo.Done { + state = int(modelPB.Model_STATE_UNSPECIFIED) + } else { + if err := s.DeleteResourceWorkflowId(ctx, resource.Name); err != nil { + return err + } + } + } + } + } + } case util.RESOURCE_TYPE_PIPELINE: state = int(resource.GetPipelineState()) case util.RESOURCE_TYPE_SOURCE_CONNECTOR, util.RESOURCE_TYPE_DESTINATION_CONNECTOR: @@ -147,39 +164,6 @@ func (s *service) UpdateResourceState(ctx context.Context, resource *controllerP return fmt.Errorf(fmt.Sprintf("update resource type %s not implemented", resourceType)) } - // only for models - if workflowId != nil { - if len(*workflowId) > 1 { - opInfo, err := s.getOperationInfo(*workflowId, resourceType) - - if err != nil { - return err - } - - if opInfo != nil { - - if !opInfo.Done { - switch resourceType { - case util.RESOURCE_TYPE_MODEL: - state = int(modelPB.Model_STATE_UNSPECIFIED) - case util.RESOURCE_TYPE_PIPELINE: - state = int(pipelinePB.Pipeline_STATE_UNSPECIFIED) - case util.RESOURCE_TYPE_SOURCE_CONNECTOR, util.RESOURCE_TYPE_DESTINATION_CONNECTOR: - state = int(connectorPB.Connector_STATE_UNSPECIFIED) - case util.RESOURCE_TYPE_SERVICE: - state = int(healthcheckPB.HealthCheckResponse_SERVING_STATUS_UNSPECIFIED) - default: - return fmt.Errorf(fmt.Sprintf("resource type %s not implemented", resourceType)) - } - } else { - if err := s.DeleteResourceWorkflowId(ctx, resource.Name); err != nil { - return err - } - } - } - } - } - _, err := s.etcdClient.Put(ctx, resource.Name, fmt.Sprint(state)) if err != nil { From 86a6ba4fed6f24194defdfeeab21234f706a3a89 Mon Sep 17 00:00:00 2001 From: Instill AI Bot <70758845+droplet-bot@users.noreply.github.com> Date: Sat, 15 Apr 2023 23:15:04 +0100 Subject: [PATCH 3/3] chore(main): release 0.1.1-alpha (#17) :robot: I have created a release *beep* *boop* --- ## [0.1.1-alpha](https://github.com/instill-ai/controller/compare/v0.1.0-alpha...v0.1.1-alpha) (2023-04-15) ### Bug Fixes * probe requests block thread ([#13](https://github.com/instill-ai/controller/issues/13)) ([0294b2a](https://github.com/instill-ai/controller/commit/0294b2ad69e43e76921e2760df4e8dd590a1695b)) --- This PR was generated with [Release Please](https://github.com/googleapis/release-please). See [documentation](https://github.com/googleapis/release-please#release-please). --- CHANGELOG.md | 7 +++++++ release-please/manifest.json | 2 +- 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 188a522..f27d3f1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,12 @@ # Changelog +## [0.1.1-alpha](https://github.com/instill-ai/controller/compare/v0.1.0-alpha...v0.1.1-alpha) (2023-04-15) + + +### Bug Fixes + +* probe requests block thread ([#13](https://github.com/instill-ai/controller/issues/13)) ([0294b2a](https://github.com/instill-ai/controller/commit/0294b2ad69e43e76921e2760df4e8dd590a1695b)) + ## [0.1.0-alpha](https://github.com/instill-ai/controller/compare/v0.0.0-alpha...v0.1.0-alpha) (2023-04-08) diff --git a/release-please/manifest.json b/release-please/manifest.json index 6bf28b4..2a46094 100644 --- a/release-please/manifest.json +++ b/release-please/manifest.json @@ -1,3 +1,3 @@ { - ".": "0.1.0-alpha" + ".": "0.1.1-alpha" }