Skip to content

Commit

Permalink
增加neo4j Repository
Browse files Browse the repository at this point in the history
  • Loading branch information
liuxd6825 committed Jul 21, 2022
1 parent 9d53fac commit 0ea1ccb
Show file tree
Hide file tree
Showing 18 changed files with 979 additions and 12 deletions.
16 changes: 10 additions & 6 deletions ddd/ddd_repository/ddd_mongodb/repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,19 @@ type Repository[T ddd.Entity] struct {
}

func NewRepository[T ddd.Entity](newFun func() T, mongodb *MongoDB, collection *mongo.Collection) *Repository[T] {
return &Repository[T]{
newFun: newFun,
collection: collection,
mongodb: mongodb,
}
r := &Repository[T]{}
r.Init(newFun, mongodb, collection)
return r
}

func (r *Repository[T]) Init(newFun func() T, mongodb *MongoDB, collection *mongo.Collection) {
r.newFun = newFun
r.collection = collection
r.mongodb = mongodb
}

func (r *Repository[T]) NewEntity() T {
return r.newFun()
return T{}
}

func (r *Repository[T]) NewEntityList() *[]T {
Expand Down
61 changes: 61 additions & 0 deletions ddd/ddd_repository/ddd_neo4j/base_element.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package ddd_neo4j

import "github.com/liuxd6825/dapr-go-ddd-sdk/ddd"

type ElementEntity interface {
ddd.Entity
SetNid(int2 int64)
GetNid() int64

SetLabels([]string)
GetLabels() []string

GetId() string
SetId(string)

SetTenantId(string)
GetTenantId() string
}

type BaseElement struct {
Id string `json:"id"`
TenantId string `json:"tenantId"`
Nid int64 `json:"-"`
Labels []string `json:"-"`
}

func (b *BaseElement) SetNid(int2 int64) {
b.Nid = int2
}

func (b *BaseElement) GetNid() int64 {
return b.Nid
}

func (b *BaseElement) SetLabels(strings []string) {
b.Labels = strings
}

func (b *BaseElement) GetLabels() []string {
return b.Labels
}

func (b *BaseElement) SetTenantId(s string) {
b.TenantId = s
}

func (b *BaseElement) GetTenantId() string {
return b.TenantId
}

func (b *BaseElement) SetId(s string) {
b.Id = s
}

func (b *BaseElement) GetId() string {
return b.Id
}

func newElementEntity() ElementEntity {
return &BaseElement{}
}
1 change: 1 addition & 0 deletions ddd/ddd_repository/ddd_neo4j/base_graph.go
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
package ddd_neo4j
9 changes: 9 additions & 0 deletions ddd/ddd_repository/ddd_neo4j/base_node.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package ddd_neo4j

type NodeEntity interface {
ElementEntity
}

type BaseNode struct {
BaseElement
}
1 change: 1 addition & 0 deletions ddd/ddd_repository/ddd_neo4j/base_node_repository.go
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
package ddd_neo4j
1 change: 1 addition & 0 deletions ddd/ddd_repository/ddd_neo4j/base_path.go
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
package ddd_neo4j
49 changes: 49 additions & 0 deletions ddd/ddd_repository/ddd_neo4j/base_relationship.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package ddd_neo4j

type RelationshipEntity interface {
ElementEntity

SetType(string)
GetType() string

SetStartId(int64)
GetStartId() int64

SetEndId(int64)
GetEndId() int64
}

func NewRelationshipEntity() RelationshipEntity {
return &BaseRelationship{}
}

type BaseRelationship struct {
BaseElement
Type string
StartId int64
EndId int64
}

func (b *BaseRelationship) SetType(s string) {
b.Type = s
}

func (b *BaseRelationship) GetType() string {
return b.Type
}

func (b *BaseRelationship) SetStartId(i int64) {
b.StartId = i
}

func (b *BaseRelationship) GetStartId() int64 {
return b.StartId
}

func (b *BaseRelationship) SetEndId(i int64) {
b.EndId = i
}

func (b *BaseRelationship) GetEndId() int64 {
return b.EndId
}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
package ddd_neo4j
239 changes: 239 additions & 0 deletions ddd/ddd_repository/ddd_neo4j/base_repository.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,239 @@
package ddd_neo4j

import (
"context"
"errors"
"github.com/liuxd6825/dapr-go-ddd-sdk/assert"
"github.com/liuxd6825/dapr-go-ddd-sdk/ddd"
"github.com/liuxd6825/dapr-go-ddd-sdk/ddd/ddd_repository"
"github.com/liuxd6825/dapr-go-ddd-sdk/utils/reflectutils"
"github.com/neo4j/neo4j-go-driver/v4/neo4j"
"log"
)

type Neo4jEntity interface {
ddd.Entity
}

type BaseRepository[T ElementEntity] struct {
driver neo4j.Driver
cypherBuilder CypherBuilder
}

type SessionOptions struct {
AccessMode *neo4j.AccessMode
}

func NewBaseRepository[T ElementEntity](driver neo4j.Driver, builder CypherBuilder) *BaseRepository[T] {
base := &BaseRepository[T]{}
return base.Init(driver, builder)
}

func (r *BaseRepository[T]) Init(driver neo4j.Driver, builder CypherBuilder) *BaseRepository[T] {
r.driver = driver
r.cypherBuilder = builder
return r
}

func (r *BaseRepository[T]) Insert(ctx context.Context, entity T, opts ...*ddd_repository.SetOptions) *ddd_repository.SetResult[T] {
cypher, params, err := r.cypherBuilder.CreateOne(ctx, entity)
res, err := r.doSet(ctx, entity.GetTenantId(), cypher, params, opts...)
if err := res.GetOne("", entity); err != nil {
return ddd_repository.NewSetResultError[T](err)
}
return ddd_repository.NewSetResult(entity, err)
}

func (r *BaseRepository[T]) Update(ctx context.Context, entity T, opts ...*ddd_repository.SetOptions) *ddd_repository.SetResult[T] {
cypher, params, err := r.cypherBuilder.UpdateById(ctx, entity)
res, err := r.doSet(ctx, entity.GetTenantId(), cypher, params, opts...)
if err := res.GetOne("", entity); err != nil {
return ddd_repository.NewSetResultError[T](err)
}
return ddd_repository.NewSetResult(entity, err)
}

func (r *BaseRepository[T]) UpdateMany(ctx context.Context, list *[]T, opts ...*ddd_repository.SetOptions) *ddd_repository.SetManyResult[T] {
for _, entity := range *list {
if cypher, params, err := r.cypherBuilder.UpdateById(ctx, entity); err != nil {
return ddd_repository.NewSetManyResultError[T](err)
} else {
if res, err := r.doSet(ctx, entity.GetTenantId(), cypher, params, opts...); err != nil {
return ddd_repository.NewSetManyResultError[T](err)
} else if err := res.GetOne("n", entity); err != nil {
return ddd_repository.NewSetManyResultError[T](err)
}
}
}
return ddd_repository.NewSetManyResult(list, nil)
}

func (r *BaseRepository[T]) DeleteById(ctx context.Context, entity T, opts ...*ddd_repository.SetOptions) *ddd_repository.SetResult[T] {
cypher, params, err := r.cypherBuilder.DeleteById(ctx, entity)
_, err = r.doSet(ctx, entity.GetTenantId(), cypher, params, opts...)
return ddd_repository.NewSetResult(entity, err)
}

func (r *BaseRepository[T]) FindById(ctx context.Context, tenantId, id string) (T, error) {
var null T
cypher, err := r.cypherBuilder.FindById(ctx, tenantId, id)
if err != nil {
return null, err
}
result, err := r.Query(ctx, cypher)
entity := reflectutils.NewStruct[T]()
if err := result.GetOne("", entity); err != nil {
return null, err
}
return entity.(T), nil
}

func (r *BaseRepository[T]) doSet(ctx context.Context, tenantId string, cypher string, params map[string]interface{}, opts ...*ddd_repository.SetOptions) (*Result, error) {
if err := assert.NotEmpty(tenantId, assert.NewOptions("tenantId is empty")); err != nil {
return nil, err
}
res, err := r.doSession(ctx, func(tx neo4j.Transaction) (*Result, error) {
r, err := tx.Run(cypher, params)
return NewResult(r), err
})
return res, err
}

func (r *BaseRepository[T]) getLabels(entity ElementEntity) string {
label := ""
for _, l := range entity.GetLabels() {
label = label + " :" + l
}
return label
}

func (r *BaseRepository[T]) Write(ctx context.Context, cypher string) (*Result, error) {
return r.doSession(ctx, func(tx neo4j.Transaction) (*Result, error) {
result, err := tx.Run(cypher, nil)
if err != nil {
return nil, err
}
return NewResult(result), err
})
}

func (r *BaseRepository[T]) Query(ctx context.Context, cypher string) (*Result, error) {
var resultData *Result
_, err := r.doSession(ctx, func(tx neo4j.Transaction) (*Result, error) {
result, err := tx.Run(cypher, nil)
if err != nil {
log.Println("wirte to DB with error:", err)
return nil, err
}
resultData = NewResult(result)
return nil, err
})
return resultData, err
}

func (r *BaseRepository[T]) doSession(ctx context.Context, fun func(tx neo4j.Transaction) (*Result, error), opts ...*SessionOptions) (*Result, error) {
if fun == nil {
return nil, errors.New("doSession(ctx, fun) fun is nil")
}
if sc, ok := GetSessionContext(ctx); ok {
tx := sc.GetTransaction()
_, err := fun(tx)
return nil, err
}

opt := NewSessionOptions()
opt.Merge(opts...)
opt.setDefault()

session := r.driver.NewSession(neo4j.SessionConfig{AccessMode: *opt.AccessMode})
defer func() {
_ = session.Close()
}()

var res interface{}
var err error
if *opt.AccessMode == neo4j.AccessModeRead {
res, err = session.ReadTransaction(func(tx neo4j.Transaction) (interface{}, error) {
return fun(tx)
})
} else if *opt.AccessMode == neo4j.AccessModeWrite {
res, err = session.WriteTransaction(func(tx neo4j.Transaction) (interface{}, error) {
return fun(tx)
})
}

if result, ok := res.(*Result); ok {
return result, err
}

return nil, err
}

func (r *BaseRepository[T]) FindPaging(ctx context.Context, query ddd_repository.FindPagingQuery, opts ...*ddd_repository.FindOptions) *ddd_repository.FindPagingResult[T] {
/* return r.DoFilter(query.GetTenantId(), query.GetFilter(), func(filter map[string]interface{}) (*ddd_repository.FindPagingResult[T], bool, error) {
if err := assert.NotEmpty(query.GetTenantId(), assert.NewOptions("tenantId is empty")); err != nil {
return nil, false, err
}
data := r.NewEntityList()
findOptions := getFindOptions(opts...)
if query.GetPageSize() > 0 {
findOptions.SetLimit(query.GetPageSize())
findOptions.SetSkip(query.GetPageSize() * query.GetPageNum())
}
if len(query.GetSort()) > 0 {
sort, err := r.getSort(query.GetSort())
if err != nil {
return nil, false, err
}
findOptions.SetSort(sort)
}
cursor, err := r.collection.Find(ctx, filter, findOptions)
if err != nil {
return nil, false, err
}
err = cursor.All(ctx, data)
totalRows, err := r.collection.CountDocuments(ctx, filter)
findData := ddd_repository.NewFindPagingResult[T](data, totalRows, query, err)
return findData, true, err
})*/
return nil
}

func (r *BaseRepository[T]) NewSetManyResult(result *Result, err error) *ddd_repository.SetManyResult[T] {
if err != nil {
return ddd_repository.NewSetManyResultError[T](err)
}
var data []T
if err := result.GetList("n", &data); err != nil {
ddd_repository.NewSetResultError[T](err)
}
return ddd_repository.NewSetManyResult[T](&data, err)
}

func NewSessionOptions() *SessionOptions {
return &SessionOptions{}
}

func (r *SessionOptions) SetAccessMode(accessMode neo4j.AccessMode) {
r.AccessMode = &accessMode
}

func (r *SessionOptions) Merge(opts ...*SessionOptions) {
for _, o := range opts {
if o == nil {
continue
}
if o.AccessMode != nil {
r.SetAccessMode(*o.AccessMode)
}
}
}

func (r *SessionOptions) setDefault() {
if r.AccessMode == nil {
r.SetAccessMode(neo4j.AccessModeWrite)
}
}
Loading

0 comments on commit 0ea1ccb

Please sign in to comment.