Skip to content

Commit

Permalink
Extract Delete / Insert / Select / Update operations from GeodeBacken…
Browse files Browse the repository at this point in the history
…d into separate (local) classes
  • Loading branch information
asereda-gs committed Oct 16, 2019
1 parent 8ea4236 commit c3e3780
Show file tree
Hide file tree
Showing 5 changed files with 280 additions and 137 deletions.
149 changes: 12 additions & 137 deletions criteria/geode/src/org/immutables/criteria/geode/GeodeBackend.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,60 +16,41 @@

package org.immutables.criteria.geode;

import com.google.common.base.Preconditions;
import io.reactivex.BackpressureStrategy;
import io.reactivex.Completable;
import io.reactivex.Flowable;
import io.reactivex.FlowableEmitter;
import io.reactivex.Single;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.query.CqAttributesFactory;
import org.apache.geode.cache.query.CqQuery;
import org.apache.geode.cache.query.QueryService;
import org.apache.geode.cache.query.Struct;
import org.immutables.criteria.backend.Backend;
import org.immutables.criteria.backend.BackendException;
import org.immutables.criteria.backend.DefaultResult;
import org.immutables.criteria.backend.IdExtractor;
import org.immutables.criteria.backend.IdResolver;
import org.immutables.criteria.backend.ProjectedTuple;
import org.immutables.criteria.backend.StandardOperations;
import org.immutables.criteria.backend.WatchEvent;
import org.immutables.criteria.backend.WriteResult;
import org.immutables.criteria.expression.AggregationCall;
import org.immutables.criteria.expression.Expression;
import org.immutables.criteria.expression.Path;
import org.immutables.criteria.expression.Query;
import org.reactivestreams.Publisher;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Function;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

/**
* Backend for <a href="https://geode.apache.org/">Apache Geode</a>
*/
public class GeodeBackend implements Backend {

private static final Logger logger = Logger.getLogger(GeodeBackend.class.getName());
static final Logger logger = Logger.getLogger(GeodeBackend.class.getName());

private final RegionResolver resolver;
private final IdResolver idResolver;

/**
* Convert Geode specific {@link QueryService#UNDEFINED} value to null
*/
private final static Function<Object, Object> UNDEFINED_TO_NULL = value -> QueryService.UNDEFINED.equals(value) ? null : value;


public GeodeBackend(GeodeSetup setup) {
Objects.requireNonNull(setup, "setup");
Expand All @@ -85,14 +66,13 @@ public Backend.Session open(Class<?> entityType) {
return new Session(entityType, idResolver, region);
}

@SuppressWarnings("unchecked")
private static class Session implements Backend.Session {
static class Session implements Backend.Session {

private final Class<?> entityType;
private final Region<Object, Object> region;
private final IdExtractor idExtractor;
private final IdResolver idResolver;
private final QueryService queryService;
final Class<?> entityType;
final Region<Object, Object> region;
final IdExtractor idExtractor;
final IdResolver idResolver;
final QueryService queryService;

private Session(Class<?> entityType, IdResolver idResolver, Region<Object, Object> region) {
this.entityType = Objects.requireNonNull(entityType, "entityType");
Expand All @@ -114,13 +94,13 @@ public Result execute(Operation operation) {

private Publisher<?> executeInternal(Operation operation) {
if (operation instanceof StandardOperations.Select) {
return query((StandardOperations.Select) operation);
return Flowable.fromCallable(new SyncSelect(this, (StandardOperations.Select) operation)).flatMapIterable(x -> x);
} else if (operation instanceof StandardOperations.Update) {
return update((StandardOperations.Update) operation);
return Flowable.fromCallable(new SyncUpdate(this, (StandardOperations.Update) operation));
} else if (operation instanceof StandardOperations.Insert) {
return insert((StandardOperations.Insert) operation);
return Flowable.fromCallable(new SyncInsert(this, (StandardOperations.Insert) operation));
} else if (operation instanceof StandardOperations.Delete) {
return delete((StandardOperations.Delete) operation);
return Flowable.fromCallable(new SyncDelete(this, (StandardOperations.Delete) operation));
} else if (operation instanceof StandardOperations.Watch) {
return watch((StandardOperations.Watch) operation);
}
Expand All @@ -129,111 +109,6 @@ private Publisher<?> executeInternal(Operation operation) {
operation, GeodeBackend.class.getSimpleName())));
}

private Flowable<?> query(StandardOperations.Select op) {
// for projections use tuple function
Function<Object, Object> tupleFn = op.query().hasProjections() ? obj -> Geodes.castNumbers(toTuple(op.query(), obj)) : x -> x;

return Flowable.fromCallable(() -> {
OqlWithVariables oql = toOql(op.query(), true);
if (logger.isLoggable(Level.FINE)) {
logger.log(Level.FINE, "Querying Geode with {0}", oql);
}
Iterable<Object> result = (Iterable<Object>) queryService.newQuery(oql.oql()).execute(oql.variables().toArray(new Object[0]));
// conversion to tuple should happen before rxjava because it doesn't allow nulls
return StreamSupport.stream(result.spliterator(), false).map(tupleFn).collect(Collectors.toList());
})
.flatMapIterable(x -> x);
}

private static ProjectedTuple toTuple(Query query, Object value) {
if (!(value instanceof Struct)) {
// most likely single projection
Preconditions.checkArgument(query.projections().size() == 1, "Expected single projection got %s", query.projections().size());
Expression projection = query.projections().get(0);
return ProjectedTuple.ofSingle(projection, UNDEFINED_TO_NULL.apply(value));
}

Struct struct = (Struct) value;
List<Object> values = Arrays.stream(struct.getFieldValues()).map(UNDEFINED_TO_NULL).collect(Collectors.toList());
return ProjectedTuple.of(query.projections(), values);
}

private Flowable<WriteResult> update(StandardOperations.Update op) {
if (op.values().isEmpty()) {
return Flowable.just(WriteResult.empty());
}

Map<Object, Object> toInsert = op.values().stream().collect(Collectors.toMap(idExtractor::extract, x -> x));
Region<Object, Object> region = this.region;

// use putAll for upsert
if (op.upsert()) {
return Flowable.fromCallable(() -> {
region.putAll(toInsert);
return WriteResult.unknown();
});
}

// use replace for update (after extracting ids)
return Flowable.fromCallable(() -> {
long inserted = 0;
long updated = 0;
for (Map.Entry<Object, Object> entry: toInsert.entrySet()) {
if (region.replace(entry.getKey(), entry.getValue()) == null) {
inserted++;
} else {
updated++;
}
}
return GeodeWriteResult.of().withInsertedCount(inserted).withUpdatedCount(updated);
});
}

private Flowable<WriteResult> insert(StandardOperations.Insert op) {
if (op.values().isEmpty()) {
return Flowable.just(WriteResult.empty());
}

final Map<Object, Object> toInsert = op.values().stream().collect(Collectors.toMap(idExtractor::extract, x -> x));
final Region<Object, Object> region = this.region;
for (Map.Entry<Object, Object> entry: toInsert.entrySet()) {
Object previous = region.putIfAbsent(entry.getKey(), entry.getValue());
if (previous != null) {
return Flowable.error(new BackendException(String.format("Duplicate id %s for %s", entry.getKey(), entityType())));
}
}
return Flowable.just(GeodeWriteResult.of().withInsertedCount(toInsert.size()));
}

private <T> Flowable<WriteResult> delete(StandardOperations.Delete op) {
if (!op.query().filter().isPresent()) {
// no filter means delete all (ie clear whole region)
return Completable.fromRunnable(region::clear)
.toSingleDefault(WriteResult.unknown())
.toFlowable();
}

final Expression filter = op.query().filter().orElseThrow(() -> new IllegalStateException("For " + op));
final Optional<List<?>> ids = Geodes.canDeleteByKey(filter, idResolver);
// list of ids is present in the expression
if (ids.isPresent()) {
// delete by key: map.remove(key)
return Completable.fromRunnable(() -> region.removeAll(ids.get()))
.toSingleDefault(WriteResult.unknown())
.toFlowable();
}

final GeodeQueryVisitor visitor = new GeodeQueryVisitor(true, path -> String.format("e.value.%s", path.toStringPath()));
final OqlWithVariables oql = filter.accept(visitor);

final String query = String.format("select distinct e.key from %s.entries e where %s", region.getFullPath(), oql.oql());

return Single.fromCallable(() -> queryService.newQuery(query).execute(oql.variables().toArray(new Object[0])))
.flatMapCompletable(list -> Completable.fromRunnable(() -> region.removeAll((Collection<Object>) list)))
.toSingleDefault(WriteResult.unknown())
.toFlowable();
}

private <T> Publisher<WatchEvent<T>> watch(StandardOperations.Watch operation) {
return Flowable.create(e -> {
final FlowableEmitter<WatchEvent<T>> emitter = e.serialize();
Expand All @@ -246,7 +121,7 @@ private <T> Publisher<WatchEvent<T>> watch(StandardOperations.Watch operation) {
}, BackpressureStrategy.ERROR);
}

private OqlWithVariables toOql(Query query, boolean useBindVariables) {
OqlWithVariables toOql(Query query, boolean useBindVariables) {
final StringBuilder oql = new StringBuilder("SELECT");
if (!query.hasProjections()) {
oql.append(" * ");
Expand Down
67 changes: 67 additions & 0 deletions criteria/geode/src/org/immutables/criteria/geode/SyncDelete.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Copyright 2019 Immutables Authors and Contributors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.immutables.criteria.geode;

import org.apache.geode.cache.Region;
import org.immutables.criteria.backend.StandardOperations;
import org.immutables.criteria.backend.WriteResult;
import org.immutables.criteria.expression.Expression;

import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.Callable;

class SyncDelete implements Callable<WriteResult> {

private final GeodeBackend.Session session;
private final StandardOperations.Delete operation;
private final Region<Object, Object> region;

SyncDelete(GeodeBackend.Session session, StandardOperations.Delete operation) {
this.session = session;
this.operation = operation;
this.region = session.region;
}

@Override
public WriteResult call() throws Exception {
if (!operation.query().filter().isPresent()) {
// no filter means delete all (ie clear whole region)
region.clear();
return WriteResult.unknown();
}

Expression filter = operation.query().filter().orElseThrow(() -> new IllegalStateException("For " + operation));
Optional<List<?>> ids = Geodes.canDeleteByKey(filter, session.idResolver);

// list of ids is present in the expression
if (ids.isPresent()) {
// delete by key: map.remove(key)
region.removeAll(ids.get());
return WriteResult.unknown();
}

GeodeQueryVisitor visitor = new GeodeQueryVisitor(true, path -> String.format("e.value.%s", path.toStringPath()));
OqlWithVariables oql = filter.accept(visitor);

String query = String.format("select distinct e.key from %s.entries e where %s", region.getFullPath(), oql.oql());
Collection<?> keys = (Collection<?>) session.queryService.newQuery(query).execute(oql.variables().toArray(new Object[0]));
region.removeAll(keys);
return GeodeWriteResult.of().withDeletedCount(keys.size());
}
}
57 changes: 57 additions & 0 deletions criteria/geode/src/org/immutables/criteria/geode/SyncInsert.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Copyright 2019 Immutables Authors and Contributors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.immutables.criteria.geode;

import org.apache.geode.cache.Region;
import org.immutables.criteria.backend.BackendException;
import org.immutables.criteria.backend.StandardOperations;
import org.immutables.criteria.backend.WriteResult;

import java.util.Map;
import java.util.concurrent.Callable;
import java.util.stream.Collectors;

class SyncInsert implements Callable<WriteResult> {

private final GeodeBackend.Session session;
private final StandardOperations.Insert operation;
private final Region<Object, Object> region;

SyncInsert(GeodeBackend.Session session, StandardOperations.Insert operation) {
this.session = session;
this.operation = operation;
this.region = session.region;
}

@Override
public WriteResult call() throws Exception {
if (operation.values().isEmpty()) {
return WriteResult.empty();
}

final Map<Object, Object> toInsert = operation.values().stream().collect(Collectors.toMap(session.idExtractor::extract, x -> x));
final Region<Object, Object> region = this.region;
for (Map.Entry<Object, Object> entry: toInsert.entrySet()) {
Object previous = region.putIfAbsent(entry.getKey(), entry.getValue());
if (previous != null) {
throw new BackendException(String.format("Duplicate id %s for %s", entry.getKey(), session.entityType()));
}
}

return GeodeWriteResult.of().withInsertedCount(toInsert.size());
}
}
Loading

0 comments on commit c3e3780

Please sign in to comment.