Skip to content
This repository has been archived by the owner on Aug 24, 2023. It is now read-only.

Commit

Permalink
chore(connector): adopt removal of search attributes (#16)
Browse files Browse the repository at this point in the history
Because

- remove search attributes in connector-backend

This commit

- update connector probing logic to passively wait for connector-backend
to update the state when check workflow is done
  • Loading branch information
heiruwu authored Apr 14, 2023
1 parent 0294b2a commit a086a55
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 73 deletions.
54 changes: 14 additions & 40 deletions pkg/service/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,8 @@ package service
import (
"context"
"fmt"
"strconv"
"sync"

"cloud.google.com/go/longrunning/autogen/longrunningpb"
"github.com/instill-ai/controller/internal/logger"
"github.com/instill-ai/controller/internal/util"

Expand Down Expand Up @@ -75,9 +73,11 @@ func (s *service) ProbeSourceConnectors(ctx context.Context, cancel context.Canc
logger.Error(err.Error())
return
}
if err := s.updateRunningConnector(ctx, resourceName, *opInfo); err != nil {
logger.Error(err.Error())
return
if opInfo.Done {
if err := s.DeleteResourceWorkflowId(ctx, resourceName); err != nil {
logger.Error(err.Error())
return
}
}
// if not trigger connector check workflow
} else {
Expand All @@ -94,6 +94,8 @@ func (s *service) ProbeSourceConnectors(ctx context.Context, cancel context.Canc
return
}
}
logResp, _ := s.GetResourceState(ctx, resourceName)
logger.Info(fmt.Sprintf("[Controller] Got %v", logResp))
}(connector)
}

Expand Down Expand Up @@ -163,9 +165,11 @@ func (s *service) ProbeDestinationConnectors(ctx context.Context, cancel context
logger.Error(err.Error())
return
}
if err := s.updateRunningConnector(ctx, resourceName, *opInfo); err != nil {
logger.Error(err.Error())
return
if opInfo.Done {
if err := s.DeleteResourceWorkflowId(ctx, resourceName); err != nil {
logger.Error(err.Error())
return
}
}
// if not trigger connector check workflow
} else {
Expand All @@ -181,6 +185,8 @@ func (s *service) ProbeDestinationConnectors(ctx context.Context, cancel context
return
}
}
logResp, _ := s.GetResourceState(ctx, resourceName)
logger.Info(fmt.Sprintf("[Controller] Got %v", logResp))
}(connector)
}

Expand All @@ -189,36 +195,7 @@ func (s *service) ProbeDestinationConnectors(ctx context.Context, cancel context
return nil
}

func (s *service) updateRunningConnector(ctx context.Context, resourceName string, opInfo longrunningpb.Operation) error {
logger, _ := logger.GetZapLogger()

// if workflow done get result, otherwise remains same state
if opInfo.Done {
stateInt, err := strconv.ParseInt(string(opInfo.GetResponse().Value[:]), 10, 32)
if err != nil {
return err
}
if err := s.UpdateResourceState(ctx, &controllerPB.Resource{
Name: resourceName,
State: &controllerPB.Resource_ConnectorState{
ConnectorState: connectorPB.Connector_State(stateInt),
},
}); err != nil {
return err
}
if err := s.DeleteResourceWorkflowId(ctx, resourceName); err != nil {
return err
}
}

logResp, _ := s.GetResourceState(ctx, resourceName)
logger.Info(fmt.Sprintf("[Controller] Got %v", logResp))

return nil
}

func (s *service) updateStaleConnector(ctx context.Context, resourceName string, workflowId string) error {
logger, _ := logger.GetZapLogger()
// non grpc/http connector, save workflowid
if workflowId != "" {
if err := s.UpdateResourceWorkflowId(ctx, resourceName, workflowId); err != nil {
Expand All @@ -235,8 +212,5 @@ func (s *service) updateStaleConnector(ctx context.Context, resourceName string,
}
}

logResp, _ := s.GetResourceState(ctx, resourceName)
logger.Info(fmt.Sprintf("[Controller] Got %v", logResp))

return nil
}
50 changes: 17 additions & 33 deletions pkg/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,23 @@ func (s *service) UpdateResourceState(ctx context.Context, resource *controllerP
switch resourceType {
case util.RESOURCE_TYPE_MODEL:
state = int(resource.GetModelState())
if workflowId != nil {
if len(*workflowId) > 1 {
if opInfo, err := s.getOperationInfo(*workflowId, resourceType); err != nil {
return err
} else {
if opInfo != nil {
if !opInfo.Done {
state = int(modelPB.Model_STATE_UNSPECIFIED)
} else {
if err := s.DeleteResourceWorkflowId(ctx, resource.Name); err != nil {
return err
}
}
}
}
}
}
case util.RESOURCE_TYPE_PIPELINE:
state = int(resource.GetPipelineState())
case util.RESOURCE_TYPE_SOURCE_CONNECTOR, util.RESOURCE_TYPE_DESTINATION_CONNECTOR:
Expand All @@ -147,39 +164,6 @@ func (s *service) UpdateResourceState(ctx context.Context, resource *controllerP
return fmt.Errorf(fmt.Sprintf("update resource type %s not implemented", resourceType))
}

// only for models
if workflowId != nil {
if len(*workflowId) > 1 {
opInfo, err := s.getOperationInfo(*workflowId, resourceType)

if err != nil {
return err
}

if opInfo != nil {

if !opInfo.Done {
switch resourceType {
case util.RESOURCE_TYPE_MODEL:
state = int(modelPB.Model_STATE_UNSPECIFIED)
case util.RESOURCE_TYPE_PIPELINE:
state = int(pipelinePB.Pipeline_STATE_UNSPECIFIED)
case util.RESOURCE_TYPE_SOURCE_CONNECTOR, util.RESOURCE_TYPE_DESTINATION_CONNECTOR:
state = int(connectorPB.Connector_STATE_UNSPECIFIED)
case util.RESOURCE_TYPE_SERVICE:
state = int(healthcheckPB.HealthCheckResponse_SERVING_STATUS_UNSPECIFIED)
default:
return fmt.Errorf(fmt.Sprintf("resource type %s not implemented", resourceType))
}
} else {
if err := s.DeleteResourceWorkflowId(ctx, resource.Name); err != nil {
return err
}
}
}
}
}

_, err := s.etcdClient.Put(ctx, resource.Name, fmt.Sprint(state))

if err != nil {
Expand Down

0 comments on commit a086a55

Please sign in to comment.