Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,9 @@ class MongoStaticApi<D> extends GormStaticApi<D> implements MongoAllOperations<D
withSession { AbstractMongoSession session ->
def entity = session.mappingContext.getPersistentEntity(persistentClass.name)
filter = wrapFilterWithMultiTenancy(filter)
return session.getCollection(entity)
MongoCollection<D> collection = session.getCollection(entity)
.withDocumentClass(persistentClass)
.find(filter)
return session.find(collection, filter)
}
}

Expand All @@ -84,10 +84,8 @@ class MongoStaticApi<D> extends GormStaticApi<D> implements MongoAllOperations<D
filter = wrapFilterWithMultiTenancy(filter)
MongoCollection<D> mongoCollection = session.getCollection(entity)
.withDocumentClass(persistentClass)
D result = options ? mongoCollection
.findOneAndDelete(filter, options) :
mongoCollection
.findOneAndDelete(filter)
D result = options ? session.findOneAndDelete(mongoCollection, filter, options) :
session.findOneAndDelete(mongoCollection, filter)

return result
}
Expand All @@ -97,8 +95,7 @@ class MongoStaticApi<D> extends GormStaticApi<D> implements MongoAllOperations<D
withSession { AbstractMongoSession session ->
def entity = session.mappingContext.getPersistentEntity(persistentClass.name)
filter = wrapFilterWithMultiTenancy(filter)
return session.getCollection(entity)
.countDocuments(filter)
return session.countDocuments(session.getCollection(entity), filter)
}
}

Expand Down Expand Up @@ -201,7 +198,7 @@ class MongoStaticApi<D> extends GormStaticApi<D> implements MongoAllOperations<D
}

List<? extends Bson> newPipeline = preparePipeline(pipeline)
AggregateIterable aggregateIterable = mongoCollection.aggregate(newPipeline)
AggregateIterable aggregateIterable = session.aggregate(mongoCollection, newPipeline)
if (doWithAggregate != null) {
aggregateIterable = doWithAggregate.apply(aggregateIterable)
}
Expand All @@ -216,7 +213,7 @@ class MongoStaticApi<D> extends GormStaticApi<D> implements MongoAllOperations<D
List<? extends Bson> newPipeline = preparePipeline(pipeline)
def mongoCollection = session.getCollection(persistentEntity)
.withReadPreference(readPreference)
def aggregateIterable = mongoCollection.aggregate(newPipeline)
def aggregateIterable = session.aggregate(mongoCollection, newPipeline)
if (doWithAggregate != null) {
aggregateIterable = doWithAggregate.apply(aggregateIterable)
}
Expand All @@ -243,7 +240,7 @@ class MongoStaticApi<D> extends GormStaticApi<D> implements MongoAllOperations<D
search = Filters.text(query)
}
search = wrapFilterWithMultiTenancy(search)
FindIterable cursor = coll.find(search)
FindIterable cursor = session.find(coll, search)

int offset = options.offset instanceof Number ? ((Number)options.offset).intValue() : 0
int max = options.max instanceof Number ? ((Number)options.max).intValue() : -1
Expand Down Expand Up @@ -277,7 +274,7 @@ class MongoStaticApi<D> extends GormStaticApi<D> implements MongoAllOperations<D

def score = Projections.metaTextScore('score')
search = wrapFilterWithMultiTenancy(search)
FindIterable cursor = coll.find(search)
FindIterable cursor = session.find(coll, search)
.projection(score)
.sort(score)
.limit(limit)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,18 @@
import java.util.concurrent.ConcurrentHashMap;

import com.mongodb.WriteConcern;
import com.mongodb.bulk.BulkWriteResult;
import com.mongodb.client.AggregateIterable;
import com.mongodb.client.ClientSession;
import com.mongodb.client.FindIterable;
import com.mongodb.client.MongoClient;
import com.mongodb.client.model.FindOneAndDeleteOptions;
import com.mongodb.client.model.UpdateOptions;
import com.mongodb.client.model.WriteModel;
import com.mongodb.client.result.DeleteResult;
import com.mongodb.client.result.UpdateResult;
import org.bson.Document;
import org.bson.conversions.Bson;

import org.springframework.context.ApplicationEventPublisher;

Expand All @@ -31,6 +41,8 @@
import org.grails.datastore.mapping.model.PersistentEntity;
import org.grails.datastore.mapping.mongo.config.MongoCollection;
import org.grails.datastore.mapping.mongo.config.MongoMappingContext;
import org.grails.datastore.mapping.transactions.SessionOnlyTransaction;
import org.grails.datastore.mapping.transactions.Transaction;

/**
* Abstract implementation on the {@link org.grails.datastore.mapping.core.Session} interface for MongoDB
Expand All @@ -47,6 +59,7 @@ public abstract class AbstractMongoSession extends AbstractSession<MongoClient>
protected MongoDatastore mongoDatastore;
protected WriteConcern writeConcern = null;
protected boolean errorOccured = false;
protected ClientSession clientSession;
protected Map<PersistentEntity, String> mongoCollections = new ConcurrentHashMap<>();
protected Map<PersistentEntity, String> mongoDatabases = new ConcurrentHashMap<>();

Expand Down Expand Up @@ -200,6 +213,120 @@ public MongoMappingContext getMappingContext() {
return (MongoMappingContext) super.getMappingContext();
}

/**
* @return the active {@link ClientSession} for the current MongoDB transaction, or {@code null}
* if no server-side transaction is in progress
*/
public ClientSession getClientSession() {
return clientSession;
}

/**
* @return {@code true} if a server-side MongoDB transaction is currently active on this session
*/
public boolean hasActiveTransaction() {
return clientSession != null && clientSession.hasActiveTransaction();
}

/**
* Detaches the {@link ClientSession} from this session once its transaction has completed.
* Called by {@link MongoTransaction} after commit or rollback closes the session.
*/
void clearClientSession() {
this.clientSession = null;
}

/**
* Closes and detaches the {@link ClientSession} if one is still attached. Used defensively when a
* transaction did not complete through {@link MongoTransaction}, so a session is never leaked.
*/
protected void closeClientSessionQuietly() {
if (clientSession != null) {
try {
clientSession.close();
}
catch (RuntimeException ignored) {
// best effort
}
finally {
clientSession = null;
}
}
}

@Override
public void disconnect() {
try {
closeClientSessionQuietly();
}
finally {
super.disconnect();
}
}

@Override
protected Transaction beginTransactionInternal() {
if (getDatastore().isTransactionsEnabled()) {
// Defensive: if a previous transaction did not complete cleanly, close its orphaned
// session before starting a new one so it cannot leak.
closeClientSessionQuietly();
ClientSession session = getNativeInterface().startSession();
try {
session.startTransaction();
}
catch (RuntimeException e) {
session.close();
throw e;
}
this.clientSession = session;
return new MongoTransaction(this, session);
}
return new SessionOnlyTransaction<>(getNativeInterface(), this);
}

// The driver exposes a session-less and a ClientSession overload for every operation, and the
// session argument cannot be null. These helpers branch once so call sites stay readable and
// behave identically (session-less) when no transaction is active.

@SuppressWarnings({"rawtypes", "unchecked"})
public BulkWriteResult bulkWrite(com.mongodb.client.MongoCollection collection, List<? extends WriteModel> writes) {
return clientSession != null ? collection.bulkWrite(clientSession, writes) : collection.bulkWrite(writes);
}

@SuppressWarnings({"rawtypes", "unchecked"})
public DeleteResult deleteMany(com.mongodb.client.MongoCollection collection, Bson filter) {
return clientSession != null ? collection.deleteMany(clientSession, filter) : collection.deleteMany(filter);
}

@SuppressWarnings({"rawtypes", "unchecked"})
public UpdateResult updateMany(com.mongodb.client.MongoCollection collection, Bson filter, Bson update, UpdateOptions options) {
return clientSession != null ? collection.updateMany(clientSession, filter, update, options) : collection.updateMany(filter, update, options);
}

public <T> FindIterable<T> find(com.mongodb.client.MongoCollection<T> collection, Bson filter) {
return clientSession != null ? collection.find(clientSession, filter) : collection.find(filter);
}

public <R> FindIterable<R> find(com.mongodb.client.MongoCollection<?> collection, Bson filter, Class<R> resultClass) {
return clientSession != null ? collection.find(clientSession, filter, resultClass) : collection.find(filter, resultClass);
}

public <T> AggregateIterable<T> aggregate(com.mongodb.client.MongoCollection<T> collection, List<? extends Bson> pipeline) {
return clientSession != null ? collection.aggregate(clientSession, pipeline) : collection.aggregate(pipeline);
}

public <T> T findOneAndDelete(com.mongodb.client.MongoCollection<T> collection, Bson filter) {
return clientSession != null ? collection.findOneAndDelete(clientSession, filter) : collection.findOneAndDelete(filter);
}

public <T> T findOneAndDelete(com.mongodb.client.MongoCollection<T> collection, Bson filter, FindOneAndDeleteOptions options) {
return clientSession != null ? collection.findOneAndDelete(clientSession, filter, options) : collection.findOneAndDelete(filter, options);
}

public long countDocuments(com.mongodb.client.MongoCollection<?> collection, Bson filter) {
return clientSession != null ? collection.countDocuments(clientSession, filter) : collection.countDocuments(filter);
}

/**
* Decodes the given entity type from the given native object type
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,6 @@ import org.grails.datastore.mapping.mongo.engine.codecs.PersistentEntityCodec
import org.grails.datastore.mapping.mongo.query.MongoQuery
import org.grails.datastore.mapping.query.Query
import org.grails.datastore.mapping.query.api.QueryableCriteria
import org.grails.datastore.mapping.transactions.SessionOnlyTransaction
import org.grails.datastore.mapping.transactions.Transaction

/**
* A MongoDB session for codec mapping style
Expand Down Expand Up @@ -236,8 +234,7 @@ class MongoCodecSession extends AbstractMongoSession {
final List<WriteModel<?>> writes = writeModels[persistentEntity]
if (writes) {

final BulkWriteResult bulkWriteResult = collection
.bulkWrite(writes)
final BulkWriteResult bulkWriteResult = bulkWrite(collection, writes)

final boolean isAcknowledged = wc.isAcknowledged()
if (!bulkWriteResult.wasAcknowledged() && isAcknowledged) {
Expand Down Expand Up @@ -306,11 +303,6 @@ class MongoCodecSession extends AbstractMongoSession {
MongoIdCoercion.coerceIdToStoredType(nativeKey, entity)
}

@Override
protected Transaction beginTransactionInternal() {
return new SessionOnlyTransaction<MongoClient>(getNativeInterface(), this)
}

@Override
protected MongoCodecEntityPersister createPersister(Class cls, MappingContext mappingContext) {
return mongoCodecEntityPersisterMap[cls]
Expand All @@ -322,7 +314,7 @@ class MongoCodecSession extends AbstractMongoSession {
final Document nativeQuery = buildNativeDocumentQueryFromCriteria(criteria, entity)

final MongoCollection collection = getCollection(entity)
final DeleteResult deleteResult = collection.deleteMany((Bson) nativeQuery)
final DeleteResult deleteResult = deleteMany(collection, (Bson) nativeQuery)
if (deleteResult.wasAcknowledged()) {
return deleteResult.deletedCount
}
Expand All @@ -347,7 +339,7 @@ class MongoCodecSession extends AbstractMongoSession {
}
}
}
final UpdateResult updateResult = collection.updateMany(nativeQuery, new Document(MONGO_SET_OPERATOR, properties), updateOptions)
final UpdateResult updateResult = updateMany(collection, nativeQuery, new Document(MONGO_SET_OPERATOR, properties), updateOptions)
if (updateResult.wasAcknowledged()) {
try {
return updateResult.modifiedCount
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoIterable;
import com.mongodb.client.model.IndexOptions;
import com.mongodb.connection.ClusterType;
import org.bson.Document;
import org.bson.codecs.Codec;
import org.bson.codecs.configuration.CodecProvider;
Expand Down Expand Up @@ -133,6 +134,9 @@ public class MongoDatastore extends AbstractDatastore implements MappingContext.
protected final Map<PersistentEntity, String> mongoDatabases = new ConcurrentHashMap<>();
protected final boolean stateless;
protected final boolean codecEngine;
protected final boolean transactionsEnabled;
private volatile Boolean transactionsSupported;
private volatile boolean warnedTransactionsUnsupported = false;
protected CodecRegistry codecRegistry;
protected final ConfigurableApplicationEventPublisher eventPublisher;
protected final PlatformTransactionManager transactionManager;
Expand Down Expand Up @@ -173,6 +177,7 @@ public MongoDatastore(final ConnectionSources<MongoClient, MongoConnectionSource
this.defaultFlushMode = settings.getFlushMode();
this.stateless = settings.isStateless();
this.codecEngine = settings.getEngine().equals(MongoConstants.CODEC_ENGINE);
this.transactionsEnabled = settings.isTransactional();
codecRegistry = CodecRegistries.fromRegistries(
CodecRegistries.fromProviders(new CodecExtensions(), new PersistentEntityCodeRegistry()),
mappingContext.getCodecRegistry(),
Expand Down Expand Up @@ -671,6 +676,54 @@ public MongoClient getMongoClient() {
return mongo;
}

/**
* Whether GORM should use real MongoDB multi-document transactions (a server-side
* {@code ClientSession}) for transactional operations. This is opt-in via
* {@code grails.mongodb.transactional} and additionally requires a replica set or sharded
* cluster; if a standalone topology is positively detected the feature is disabled (with a
* one-time warning) and GORM falls back to the legacy client-side flush behavior.
*
* @return {@code true} if server-side transactions should be used
* @since 8.0
*/
public boolean isTransactionsEnabled() {
if (!transactionsEnabled) {
return false;
}
// Once the topology is positively known it does not change at runtime, so latch the result to
// avoid recomputing (and possibly flipping) on every transaction begin.
Boolean supported = transactionsSupported;
if (supported != null) {
return supported;
}
try {
ClusterType clusterType = mongo.getClusterDescription().getType();
switch (clusterType) {
case STANDALONE:
transactionsSupported = Boolean.FALSE;
if (!warnedTransactionsUnsupported) {
warnedTransactionsUnsupported = true;
LOG.warn("grails.mongodb.transactional is enabled but the connected MongoDB topology is standalone, " +
"which does not support multi-document transactions. Falling back to flush-only transaction behavior.");
}
return false;
case REPLICA_SET:
case SHARDED:
case LOAD_BALANCED:
transactionsSupported = Boolean.TRUE;
return true;
default:
// UNKNOWN: topology not discovered yet. Assume transactions are available for this
// attempt without latching, so a later definitive determination can still apply.
return true;
}
}
catch (RuntimeException e) {
LOG.debug("Could not determine MongoDB cluster topology for transaction support; assuming transactions are available: {}", e.getMessage(), e);
return true;
}
}

public String getDatabaseName(PersistentEntity entity) {
if (entity.isMultiTenant() && multiTenancyMode == MultiTenancySettings.MultiTenancyMode.SCHEMA) {
return Tenants.currentId(getClass()).toString();
Expand Down
Loading
Loading