Skip to content

Commit

Permalink
IGNITE-23720 Add SessionContext API (#11749)
Browse files Browse the repository at this point in the history
  • Loading branch information
timoninmaxim authored Dec 17, 2024
1 parent a1d17d1 commit 297bc9c
Show file tree
Hide file tree
Showing 74 changed files with 3,028 additions and 115 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
import org.apache.ignite.internal.processors.query.calcite.exec.ExchangeServiceImpl;
import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionService;
import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionServiceImpl;
import org.apache.ignite.internal.processors.query.calcite.exec.InjectResourcesService;
import org.apache.ignite.internal.processors.query.calcite.exec.MailboxRegistry;
import org.apache.ignite.internal.processors.query.calcite.exec.MailboxRegistryImpl;
import org.apache.ignite.internal.processors.query.calcite.exec.QueryTaskExecutor;
Expand Down Expand Up @@ -236,6 +237,9 @@ public class CalciteQueryProcessor extends GridProcessorAdapter implements Query
/** */
private final DistributedCalciteConfiguration distrCfg;

/** */
private final InjectResourcesService injectSvc;

/** */
private volatile boolean started;

Expand All @@ -259,6 +263,7 @@ public CalciteQueryProcessor(GridKernalContext ctx) {
prepareSvc = new PrepareServiceImpl(ctx);
timeoutSvc = new TimeoutServiceImpl(ctx);
qryReg = new QueryRegistryImpl(ctx);
injectSvc = new InjectResourcesService(ctx);

FrameworkConfig customFrameworkCfg = ctx.plugins().createComponent(FrameworkConfig.class);
frameworkCfg = customFrameworkCfg != null ? customFrameworkCfg : FRAMEWORK_CONFIG;
Expand Down Expand Up @@ -780,4 +785,9 @@ public DistributedCalciteConfiguration distributedConfiguration() {
public FrameworkConfig frameworkConfig() {
return frameworkCfg;
}

/** */
public InjectResourcesService injectService() {
return injectSvc;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.session.SessionContext;
import org.jetbrains.annotations.Nullable;

/**
Expand Down Expand Up @@ -159,8 +160,19 @@ public RootQuery(
* @param schema new schema.
*/
public RootQuery<RowT> childQuery(SchemaPlus schema) {
return new RootQuery<>(sql, schema, params, QueryContext.of(cancel), ctx.isLocal(), ctx.isForcedJoinOrder(),
ctx.partitions(), exch, unregister, log, plannerTimeout, totalTimeout);
return new RootQuery<>(
sql,
schema,
params,
QueryContext.of(cancel, ctx.unwrap(SessionContext.class)),
ctx.isLocal(),
ctx.isForcedJoinOrder(),
ctx.partitions(),
exch,
unregister,
log,
plannerTimeout,
totalTimeout);
}

/** */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,7 @@ private ExecutionContext<?> baseInboxContext(UUID nodeId, UUID qryId, long fragm
.logger(log)
.build(),
taskExecutor(),
null,
qryId,
locaNodeId,
nodeId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Function;
Expand All @@ -46,6 +47,7 @@
import org.apache.ignite.internal.processors.cache.transactions.TransactionChanges;
import org.apache.ignite.internal.processors.query.calcite.exec.exp.ExpressionFactory;
import org.apache.ignite.internal.processors.query.calcite.exec.exp.ExpressionFactoryImpl;
import org.apache.ignite.internal.processors.query.calcite.exec.exp.ReflectiveCallNotNullImplementor;
import org.apache.ignite.internal.processors.query.calcite.exec.tracker.ExecutionNodeMemoryTracker;
import org.apache.ignite.internal.processors.query.calcite.exec.tracker.IoTracker;
import org.apache.ignite.internal.processors.query.calcite.exec.tracker.MemoryTracker;
Expand All @@ -63,6 +65,8 @@
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.session.SessionContext;
import org.apache.ignite.session.SessionContextProvider;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

Expand All @@ -78,6 +82,9 @@ public class ExecutionContext<Row> extends AbstractQueryContext implements DataC
/** Placeholder for NULL values in search bounds. */
private static final Object NULL_BOUND = new Object();

/** Emtpy session context. */
private static final SessionContext EMPTY_SESSION_CONTEXT = (attrName) -> null;

/** */
private final UUID qryId;

Expand Down Expand Up @@ -126,6 +133,15 @@ public class ExecutionContext<Row> extends AbstractQueryContext implements DataC
/** */
private final long startTs;

/** */
private final InjectResourcesService injectSvc;

/** Map associates UDF name to instance of class that contains this UDF. */
private final Map<String, Object> udfInstances = new ConcurrentHashMap<>();

/** Session context provider injected into UDF targets. */
private final SessionContextProvider sesCtxProv = new SessionContextProviderImpl();

/** */
private Object[] correlations = new Object[16];

Expand All @@ -139,6 +155,7 @@ public class ExecutionContext<Row> extends AbstractQueryContext implements DataC
public ExecutionContext(
BaseQueryContext qctx,
QueryTaskExecutor executor,
InjectResourcesService injectSvc,
UUID qryId,
UUID locNodeId,
UUID originatingNodeId,
Expand All @@ -154,6 +171,7 @@ public ExecutionContext(
super(qctx);

this.executor = executor;
this.injectSvc = injectSvc;
this.qryId = qryId;
this.locNodeId = locNodeId;
this.originatingNodeId = originatingNodeId;
Expand Down Expand Up @@ -457,6 +475,31 @@ public IoTracker ioTracker() {
return ioTracker;
}

/**
* Return an instance of class that contained a user defined function. If not exist yet, then instantiate the object
* and inject resources into it. Used by {@link ReflectiveCallNotNullImplementor} while it is preparing user function call.
*
* @param udfClsName Classname of the class contained UDF.
* @return Object with injected resources.
*/
public Object udfInstance(String udfClsName) {
return udfInstances.computeIfAbsent(udfClsName, ignore -> {
try {
Class<?> funcCls = getClass().getClassLoader().loadClass(udfClsName);

Object target = funcCls.getConstructor().newInstance();

injectSvc.injectToUdf(target, sesCtxProv);

return target;
}
catch (Exception e) {
throw new IgniteException("Failed to instantiate an object for UDF. " +
"Class " + udfClsName + " must have public zero-args constructor.", e);
}
});
}

/** {@inheritDoc} */
@Override public boolean equals(Object o) {
if (this == o)
Expand All @@ -473,4 +516,14 @@ public IoTracker ioTracker() {
@Override public int hashCode() {
return Objects.hash(qryId, fragmentDesc.fragmentId());
}

/** */
private class SessionContextProviderImpl implements SessionContextProvider {
/** {@inheritDoc} */
@Override public @Nullable SessionContext getSessionContext() {
SessionContext ctx = unwrap(SessionContext.class);

return ctx == null ? EMPTY_SESSION_CONTEXT : ctx;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.ignite.events.CacheQueryReadEvent;
import org.apache.ignite.events.EventType;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.cache.context.SessionContextImpl;
import org.apache.ignite.internal.managers.eventstorage.DiscoveryEventListener;
import org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
Expand Down Expand Up @@ -199,6 +200,9 @@ public class ExecutionServiceImpl<Row> extends AbstractService implements Execut
/** */
private MemoryTracker memoryTracker;

/** */
private InjectResourcesService injectSvc;

/**
* @param ctx Kernal.
*/
Expand Down Expand Up @@ -422,6 +426,11 @@ public MemoryTracker memoryTracker() {
return memoryTracker;
}

/** */
public void injectService(InjectResourcesService injectSvc) {
this.injectSvc = injectSvc;
}

/** {@inheritDoc} */
@Override public void onStart(GridKernalContext ctx) {
this.ctx = ctx;
Expand All @@ -446,6 +455,7 @@ public MemoryTracker memoryTracker() {
exchangeService(proc.exchangeService());
queryRegistry(proc.queryRegistry());
prepareService(proc.prepareService());
injectService(proc.injectService());

ddlCmdHnd = new DdlCommandHandler(ctx.query(), ctx.cache(), ctx.security(), () -> schemaHolder().schema(null));

Expand Down Expand Up @@ -620,6 +630,7 @@ private ListFieldsQueryCursor<?> mapAndExecutePlan(
ExecutionContext<Row> ectx = new ExecutionContext<>(
qry.context(),
taskExecutor(),
injectSvc,
qry.id(),
locNodeId,
locNodeId,
Expand Down Expand Up @@ -659,6 +670,8 @@ private ListFieldsQueryCursor<?> mapAndExecutePlan(
qry.onResponse(nodeId, fragment.fragmentId(), ex);
else {
try {
SessionContextImpl sesCtx = qry.context().unwrap(SessionContextImpl.class);

QueryStartRequest req = new QueryStartRequest(
qry.id(),
qry.localQueryId(),
Expand All @@ -670,7 +683,8 @@ private ListFieldsQueryCursor<?> mapAndExecutePlan(
qry.parameters(),
parametersMarshalled,
timeout,
ectx.getQryTxEntries()
ectx.getQryTxEntries(),
sesCtx == null ? null : sesCtx.attributes()
);

messageService().send(nodeId, req);
Expand Down Expand Up @@ -857,7 +871,9 @@ private void onMessage(UUID nodeId, final QueryStartRequest msg) {
)
);

final BaseQueryContext qctx = createQueryContext(Contexts.empty(), msg.schema());
final BaseQueryContext qctx = createQueryContext(
msg.appAttrs() == null ? Contexts.empty() : Contexts.of(new SessionContextImpl(msg.appAttrs())),
msg.schema());

QueryPlan qryPlan = queryPlanCache().queryPlan(
new CacheKey(msg.schema(), msg.root()),
Expand All @@ -869,6 +885,7 @@ private void onMessage(UUID nodeId, final QueryStartRequest msg) {
ExecutionContext<Row> ectx = new ExecutionContext<>(
qctx,
taskExecutor(),
injectSvc,
msg.queryId(),
locNodeId,
nodeId,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.internal.processors.query.calcite.exec;

import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.processors.query.calcite.util.AbstractService;
import org.apache.ignite.internal.processors.resource.GridResourceProcessor;
import org.apache.ignite.session.SessionContextProvider;

/** Service that helps inject Ignite resources to Calcite functions. */
public class InjectResourcesService extends AbstractService {
/** */
private final GridResourceProcessor rsrcProc;

/** */
public InjectResourcesService(GridKernalContext ctx) {
super(ctx);

rsrcProc = ctx.resource();
}

/**
* Inject resources to object contained a user defined function.
* @param obj Object to inject resources.
* @param sesCtxProv Session context provider.
* @throws IgniteCheckedException If failed to inject.
*/
void injectToUdf(Object obj, SessionContextProvider sesCtxProv) throws IgniteCheckedException {
rsrcProc.injectToUdf(obj, sesCtxProv);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package org.apache.ignite.internal.processors.query.calcite.exec.exp;

import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
import org.apache.calcite.adapter.enumerable.NullPolicy;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
Expand Down Expand Up @@ -47,8 +46,6 @@ private IgniteScalarFunction(Method method, CallImplementor implementor) {
* @return Created {@link ScalarFunction}.
*/
public static ScalarFunction create(Method method) {
assert Modifier.isStatic(method.getModifiers());

CallImplementor implementor = RexImpTable.createImplementor(
new ReflectiveCallNotNullImplementor(method), NullPolicy.NONE, false);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import org.apache.calcite.linq4j.tree.Expressions;
import org.apache.calcite.rex.RexCall;

import static org.apache.ignite.internal.processors.query.calcite.util.IgniteMethod.UDF_INSTANCE;

/**
* Implementation of {@link NotNullImplementor} that calls a given {@link Method}.
*
Expand Down Expand Up @@ -55,10 +57,13 @@ public ReflectiveCallNotNullImplementor(Method method) {
callExpr = Expressions.call(method, translatedOperands);

else {
// The UDF class must have a public zero-args constructor.
// Assume that the validator checked already.
final Expression target =
Expressions.new_(method.getDeclaringClass());
final Expression target = Expressions.convert_(
Expressions.call(
translator.getRoot(),
UDF_INSTANCE.method(),
Expressions.constant(method.getDeclaringClass().getName())),
method.getDeclaringClass());

callExpr = Expressions.call(target, method, translatedOperands);
}
if (!containsCheckedException(method))
Expand Down
Loading

0 comments on commit 297bc9c

Please sign in to comment.