Skip to content
This repository has been archived by the owner on Aug 24, 2023. It is now read-only.

fix(model): adopt latest model structure #7

Merged
merged 6 commits into from
Apr 5, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
chore: adopt removal of model instance
  • Loading branch information
heiruwu committed Apr 3, 2023
commit 56346bd5f60f1f5b679b792c007d794e6ec4eea6
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ require (
github.com/grpc-ecosystem/grpc-gateway/v2 v2.14.0
github.com/instill-ai/model-backend v0.13.1-alpha
github.com/instill-ai/pipeline-backend v0.9.8-alpha
github.com/instill-ai/protogen-go v0.3.3-alpha.0.20230329021956-5f3a3104f4cb
github.com/instill-ai/protogen-go v0.3.3-alpha.0.20230402125221-c8f1a70b6b8b
github.com/instill-ai/x v0.2.0-alpha
github.com/knadh/koanf v1.5.0
github.com/redis/go-redis/v9 v9.0.2
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -515,8 +515,8 @@ github.com/instill-ai/model-backend v0.13.1-alpha h1:ysmFTc6Iwh8WeZ5pb6q0Ncwd2V7
github.com/instill-ai/model-backend v0.13.1-alpha/go.mod h1:gTtWmE4fZrNV9RddCwJHWcbGGx6iC0ARcFE1iBZGclg=
github.com/instill-ai/pipeline-backend v0.9.8-alpha h1:dJoR0AuiuX9PFkmDkDmTlzVVlqineC/iyfPmsSQZsM8=
github.com/instill-ai/pipeline-backend v0.9.8-alpha/go.mod h1:NE5VTZGnEQuNubWafKtLSt/6OYmaxtjlt9kIxutb3J4=
github.com/instill-ai/protogen-go v0.3.3-alpha.0.20230329021956-5f3a3104f4cb h1:zOGuf47mpquRN/Pe66TuDOZ1vnpfUoFaOZDURwZN+QI=
github.com/instill-ai/protogen-go v0.3.3-alpha.0.20230329021956-5f3a3104f4cb/go.mod h1:7/Jj3ATVozPwB0WmKRM612o/k5UJF8K9oRCNKYH8iy0=
github.com/instill-ai/protogen-go v0.3.3-alpha.0.20230402125221-c8f1a70b6b8b h1:BI97L8e4pkbQVcqRyQsR9/Q1/4pXB+zFGUWOEL/hZ6U=
github.com/instill-ai/protogen-go v0.3.3-alpha.0.20230402125221-c8f1a70b6b8b/go.mod h1:7/Jj3ATVozPwB0WmKRM612o/k5UJF8K9oRCNKYH8iy0=
github.com/instill-ai/x v0.2.0-alpha h1:8yszKP9DE8bvSRAtEpOwqhG2wwqU3olhTqhwoiLrHfc=
github.com/instill-ai/x v0.2.0-alpha/go.mod h1:/UEx/zFyMo7so2ctBY0pzjmIoJB9Qz5Y4gvwU2FoU74=
github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM=
Expand Down
24 changes: 4 additions & 20 deletions internal/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,26 +5,10 @@ import (
"strings"
)

func ConvertModelToResourceName(modelInstanceName string) string {
splitName := strings.SplitN(modelInstanceName, "/", 4)
modelType, modelID, modelInstanceID := splitName[0], splitName[1], splitName[3]
resourceName := fmt.Sprintf("resources/%s-%s/types/%s", modelID, modelInstanceID, modelType)

return resourceName
}

func ConvertConnectorToResourceName(connectorName string) string {
splitName := strings.SplitN(connectorName, "/", 2)
connectorType, name := splitName[0], splitName[1]
resourceName := fmt.Sprintf("resources/%s/types/%s", name, connectorType)

return resourceName
}

func ConvertPipelineToResourceName(pipelineName string) string {
splitName := strings.SplitN(pipelineName, "/", 2)
pipelineType, name := splitName[0], splitName[1]
resourceName := fmt.Sprintf("resources/%s/types/%s", name, pipelineType)
func ConvertRequestToResourceName(requestName string) string {
splitName := strings.SplitN(requestName, "/", 2)
resourceType, name := splitName[0], splitName[1]
resourceName := fmt.Sprintf("resources/%s/types/%s", name, resourceType)

return resourceName
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/service/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func (s *service) ProbeSourceConnectors(ctx context.Context, cancel context.Canc
}

for _, connector := range connectors {
resourceName := util.ConvertConnectorToResourceName(connector.Name)
resourceName := util.ConvertRequestToResourceName(connector.Name)
workflowId, _ := s.GetResourceWorkflowId(ctx, resourceName)
// check if there is an ongoing workflow
if workflowId != nil {
Expand Down Expand Up @@ -98,7 +98,7 @@ func (s *service) ProbeDestinationConnectors(ctx context.Context, cancel context
}

for _, connector := range connectors {
resourceName := util.ConvertConnectorToResourceName(connector.Name)
resourceName := util.ConvertRequestToResourceName(connector.Name)
workflowId, _ := s.GetResourceWorkflowId(ctx, resourceName)
// check if there is an ongoing workflow
if workflowId != nil {
Expand Down
186 changes: 63 additions & 123 deletions pkg/service/mock_model_client_test.go

Large diffs are not rendered by default.

44 changes: 6 additions & 38 deletions pkg/service/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,59 +41,27 @@ func (s *service) ProbeModels(ctx context.Context, cancel context.CancelFunc) er
models = append(models, resp.Models...)
}

modelInstances := []*modelPB.ModelInstance{}
for _, model := range models {
view := modelPB.View_VIEW_FULL
resp, err := s.modelPublicClient.ListModelInstances(ctx, &modelPB.ListModelInstancesRequest{
Parent: model.Name,
View: &view,
})
if err != nil {
return err
}

nextPageToken := &resp.NextPageToken
totalSize := resp.TotalSize
modelInstances = append(modelInstances, resp.Instances...)

for totalSize > repository.DefaultPageSize {
resp, err := s.modelPublicClient.ListModelInstances(ctx, &modelPB.ListModelInstancesRequest{
Parent: model.Name,
PageToken: nextPageToken,
View: &view,
})

if err != nil {
return err
}

nextPageToken = &resp.NextPageToken
totalSize -= repository.DefaultPageSize
modelInstances = append(modelInstances, resp.Instances...)
}
}

for _, modelInstance := range modelInstances {
resp, err := s.modelPrivateClient.CheckModelInstance(ctx, &modelPB.CheckModelInstanceRequest{
Name: modelInstance.Name,
resp, err := s.modelPrivateClient.CheckModel(ctx, &modelPB.CheckModelRequest{
Name: model.Name,
})

if err != nil {
return err
}

err = s.UpdateResourceState(ctx, &controllerPB.Resource{
Name: util.ConvertModelToResourceName(modelInstance.Name),
State: &controllerPB.Resource_ModelInstanceState{
ModelInstanceState: resp.State,
Name: util.ConvertRequestToResourceName(model.Name),
State: &controllerPB.Resource_ModelState{
ModelState: resp.State,
},
})

if err != nil {
return err
}

logResp, _ := s.GetResourceState(ctx, util.ConvertModelToResourceName(modelInstance.Name))
logResp, _ := s.GetResourceState(ctx, util.ConvertRequestToResourceName(model.Name))
logger.Info(fmt.Sprintf("[Controller] Got %v", logResp))
}
return nil
Expand Down
26 changes: 13 additions & 13 deletions pkg/service/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func (s *service) ProbePipelines(ctx context.Context, cancel context.CancelFunc)
}

for _, pipeline := range pipelines {
resourceName := util.ConvertPipelineToResourceName(pipeline.Name)
resourceName := util.ConvertRequestToResourceName(pipeline.Name)

pipelineResource := controllerPB.Resource{
Name: resourceName,
Expand All @@ -58,32 +58,32 @@ func (s *service) ProbePipelines(ctx context.Context, cancel context.CancelFunc)

var resources []*controllerPB.Resource

sourceConnectorResource, err := s.GetResourceState(ctx, util.ConvertConnectorToResourceName(pipeline.Recipe.Source))
sourceConnectorResource, err := s.GetResourceState(ctx, util.ConvertRequestToResourceName(pipeline.Recipe.Source))
if err != nil {
s.UpdateResourceState(ctx, &pipelineResource)
logger.Error("no record find for source connector")
return err
}
resources = append(resources, sourceConnectorResource)

destinationConnectorResource, err := s.GetResourceState(ctx, util.ConvertConnectorToResourceName(pipeline.Recipe.Destination))
destinationConnectorResource, err := s.GetResourceState(ctx, util.ConvertRequestToResourceName(pipeline.Recipe.Destination))
if err != nil {
s.UpdateResourceState(ctx, &pipelineResource)
logger.Error("no record find for destination connector")
return err
}
resources = append(resources, destinationConnectorResource)

modelInstanceNames := pipeline.Recipe.ModelInstances
for _, modelInstanceName := range modelInstanceNames {
modelInstanceResource, err := s.GetResourceState(ctx, util.ConvertModelToResourceName(modelInstanceName))
modelNames := pipeline.Recipe.Models
for _, modelName := range modelNames {
modelResource, err := s.GetResourceState(ctx, util.ConvertRequestToResourceName(modelName))
if err != nil {
s.UpdateResourceState(ctx, &pipelineResource)
logger.Error(fmt.Sprintf("no record find for model instance %v", modelInstanceName))
logger.Error(fmt.Sprintf("no record find for model %v", modelName))
return err
}

resources = append(resources, modelInstanceResource)
resources = append(resources, modelResource)
}

for _, r := range resources {
Expand All @@ -105,17 +105,17 @@ func (s *service) ProbePipelines(ctx context.Context, cancel context.CancelFunc)
default:
continue
}
case *controllerPB.Resource_ModelInstanceState:
switch v.ModelInstanceState {
case modelPB.ModelInstance_STATE_OFFLINE:
case *controllerPB.Resource_ModelState:
switch v.ModelState {
case modelPB.Model_STATE_OFFLINE:
pipelineResource.State = &controllerPB.Resource_PipelineState{
PipelineState: pipelinePB.Pipeline_STATE_INACTIVE,
}
case modelPB.ModelInstance_STATE_UNSPECIFIED:
case modelPB.Model_STATE_UNSPECIFIED:
pipelineResource.State = &controllerPB.Resource_PipelineState{
PipelineState: pipelinePB.Pipeline_STATE_UNSPECIFIED,
}
case modelPB.ModelInstance_STATE_ERROR:
case modelPB.Model_STATE_ERROR:
pipelineResource.State = &controllerPB.Resource_PipelineState{
PipelineState: pipelinePB.Pipeline_STATE_ERROR,
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,8 @@ func (s *service) GetResourceState(ctx context.Context, resourceName string) (*c
case util.RESOURCE_TYPE_MODEL:
return &controllerPB.Resource{
Name: resourceName,
State: &controllerPB.Resource_ModelInstanceState{
ModelInstanceState: modelPB.ModelInstance_State(stateEnumValue),
State: &controllerPB.Resource_ModelState{
ModelState: modelPB.Model_State(stateEnumValue),
},
Progress: nil,
}, nil
Expand Down Expand Up @@ -135,7 +135,7 @@ func (s *service) UpdateResourceState(ctx context.Context, resource *controllerP

switch resourceType {
case util.RESOURCE_TYPE_MODEL:
state = int(resource.GetModelInstanceState())
state = int(resource.GetModelState())
case util.RESOURCE_TYPE_PIPELINE:
state = int(resource.GetPipelineState())
case util.RESOURCE_TYPE_SOURCE_CONNECTOR, util.RESOURCE_TYPE_DESTINATION_CONNECTOR:
Expand All @@ -160,7 +160,7 @@ func (s *service) UpdateResourceState(ctx context.Context, resource *controllerP
if !opInfo.Done {
switch resourceType {
case util.RESOURCE_TYPE_MODEL:
state = int(modelPB.ModelInstance_STATE_UNSPECIFIED)
state = int(modelPB.Model_STATE_UNSPECIFIED)
case util.RESOURCE_TYPE_PIPELINE:
state = int(pipelinePB.Pipeline_STATE_UNSPECIFIED)
case util.RESOURCE_TYPE_SOURCE_CONNECTOR, util.RESOURCE_TYPE_DESTINATION_CONNECTOR:
Expand Down
6 changes: 3 additions & 3 deletions pkg/service/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func TestGetResourceState(t *testing.T) {

resource, err := s.GetResourceState(ctx, modelResourceName)

assert.Equal(t, modelPB.ModelInstance_STATE_UNSPECIFIED, resource.GetModelInstanceState())
assert.Equal(t, modelPB.Model_STATE_UNSPECIFIED, resource.GetModelState())

assert.NoError(t, err)
})
Expand Down Expand Up @@ -251,8 +251,8 @@ func TestUpdateResourceState(t *testing.T) {

resource := controllerPB.Resource{
Name: modelResourceName,
State: &controllerPB.Resource_ModelInstanceState{
ModelInstanceState: modelPB.ModelInstance_STATE_UNSPECIFIED,
State: &controllerPB.Resource_ModelState{
ModelState: modelPB.Model_STATE_UNSPECIFIED,
},
}

Expand Down