Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,17 @@
package org.apache.ignite.internal.processors.query.calcite.message;

import java.util.UUID;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.Order;
import org.apache.ignite.internal.managers.communication.ErrorMessage;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.jetbrains.annotations.Nullable;

/**
*
*/
public class CalciteErrorMessage extends ErrorMessage implements CalciteMessage {
public class CalciteErrorMessage implements CalciteMarshalableMessage {
/** */
@Order(0)
UUID qryId;
Expand All @@ -33,19 +37,26 @@ public class CalciteErrorMessage extends ErrorMessage implements CalciteMessage
@Order(1)
long fragmentId;

/** Error bytes. */
@Order(2)
@GridToStringExclude
@Nullable public byte[] errBytes;

/** Error. */
private @Nullable Throwable err;

/** */
public CalciteErrorMessage() {
// No-op.
}

/** */
public CalciteErrorMessage(UUID qryId, long fragmentId, Throwable err) {
super(err);

assert err != null;

this.qryId = qryId;
this.fragmentId = fragmentId;
this.err = err;
}

/**
Expand All @@ -62,6 +73,11 @@ public long fragmentId() {
return fragmentId;
}

/** */
public @Nullable Throwable error() {
return err;
}

/** {@inheritDoc} */
@Override public MessageType type() {
return MessageType.QUERY_ERROR_MESSAGE;
Expand All @@ -71,4 +87,16 @@ public long fragmentId() {
@Override public short directType() {
return MessageType.QUERY_ERROR_MESSAGE.directType();
}

/** {@inheritDoc} */
@Override public void prepareMarshal(GridCacheSharedContext<?, ?> ctx) throws IgniteCheckedException {
if (err != null)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Usually, we do && errBytes == null

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, and the reason is not clear for me.
This MUST happen once.

errBytes = U.marshal(ctx.marshaller(), err);
}

/** {@inheritDoc} */
@Override public void prepareUnmarshal(GridCacheSharedContext<?, ?> ctx) throws IgniteCheckedException {
if (errBytes != null)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Usually w no && err == null

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, and the reason is not clear for me.
This MUST happen once.

err = U.unmarshal(ctx.marshaller(), errBytes, U.resolveClassLoader(ctx.cache().context().gridConfig()));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no keed to keep the bytes any more. Less memory consumption

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just let gc do it work.
This message has short life. Everything will be cleared after use.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

GC won't deal with alive not nulls. My suggestion is not to consider how long message lives. Just do not keep obsoletes. WDYT?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'ts a preoptimisation.
Don't like such things.

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -280,9 +280,29 @@ private void start(Collection<String> code, boolean write) {
returnFalseIfWriteFailed(code, "writer.writeHeader", "directType()");

if (write && marshallableMessage()) {
imports.add("org.apache.ignite.IgniteCheckedException");
imports.add("org.apache.ignite.IgniteException");

code.add(EMPTY);

code.add(identedLine("try {"));

indent++;

code.add(identedLine("msg.prepareMarshal(marshaller);"));

indent--;

code.add(identedLine("}"));
code.add(identedLine("catch (IgniteCheckedException e) {"));

indent++;

code.add(identedLine("throw new IgniteException(\"Failed to marshal object\" + msg.getClass().getSimpleName(), e);"));

indent--;

code.add(identedLine("}"));
}

code.add(EMPTY);
Expand Down Expand Up @@ -949,8 +969,28 @@ private void finish(List<String> code, boolean read, boolean marshallable) {
code.add(EMPTY);

if (read && marshallable) {
imports.add("org.apache.ignite.IgniteCheckedException");
imports.add("org.apache.ignite.IgniteException");

code.add(identedLine("try {"));

indent++;

code.add(identedLine("msg.finishUnmarshal(marshaller, clsLdr);"));

indent--;

code.add(identedLine("}"));
code.add(identedLine("catch (IgniteCheckedException e) {"));

indent++;

code.add(identedLine("throw new IgniteException(\"Failed to unmarshal object\" + msg.getClass().getSimpleName(), e);"));

indent--;

code.add(identedLine("}"));

code.add(EMPTY);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,41 +17,26 @@

package org.apache.ignite.internal.managers.communication;

import java.io.Serializable;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.internal.MessageProcessor;
import org.apache.ignite.internal.Order;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.marshaller.jdk.JdkMarshaller;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.plugin.extensions.communication.MessageWriter;
import org.apache.ignite.marshaller.Marshaller;
import org.apache.ignite.plugin.extensions.communication.MarshallableMessage;
import org.jetbrains.annotations.Nullable;

import static org.apache.ignite.marshaller.Marshallers.jdk;

/**
* Message used to transfer {@link Throwable} objects.
* <p>Because raw serialization of throwables is prohibited, you should use this message when it is necessary
* to transfer some error as part of some message. See {@link MessageProcessor} for details.
* <p>Currently, under the hood marshalling and unmarshalling is performed by {@link JdkMarshaller}.
* <p>If the message serialization fails, wraps this error with own one.
*/
@SuppressWarnings({"NullableProblems", "unused"})
// TODO Remove Serializable once https://issues.apache.org/jira/browse/IGNITE-27627 is completed.
public class ErrorMessage implements Message, Serializable {
/** */
private static final long serialVersionUID = 0L;

/** Serialization and deserealization call holder. */
@Order(value = 0, method = "errorBytes")
public class ErrorMessage implements MarshallableMessage {
/** Error bytes. */
@Order(0)
@GridToStringExclude
@Nullable public byte[] errBytes;

/** Original error. It is transient and necessary only to avoid duplicated serialization and deserializtion. */
/** Error. */
private @Nullable Throwable err;

/**
Expand All @@ -62,61 +47,32 @@ public ErrorMessage() {
}

/**
* @param err Original error. Will be lazily serialized.
* @param err Original error.
*/
public ErrorMessage(@Nullable Throwable err) {
this.err = err;
}

/**
* Provides serialized bytes of the error. Should be called only once.
*
* @return Serialized error.
* @see MessageWriter
*/
public @Nullable byte[] errorBytes() {
if (err == null)
return null;

/** {@inheritDoc} */
@Override public void prepareMarshal(Marshaller marsh) throws IgniteCheckedException {
try {
return U.marshal(jdk(), err);
if (err != null)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Usually we do && errBytes != null

errBytes = U.marshal(marsh, err);
}
catch (IgniteCheckedException e0) {
catch (IgniteCheckedException e) {
IgniteCheckedException wrappedErr = new IgniteCheckedException(err.getMessage());

wrappedErr.setStackTrace(err.getStackTrace());
wrappedErr.addSuppressed(e0);
wrappedErr.addSuppressed(e);

try {
return U.marshal(jdk(), wrappedErr);
}
catch (IgniteCheckedException e1) {
IgniteException marshErr = new IgniteException("Unable to marshal the wrapping error.", e1);

marshErr.addSuppressed(wrappedErr);

throw marshErr;
}
errBytes = U.marshal(marsh, wrappedErr);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no need to keep errBytes any more. Less memory consumption. The same in the other places

}
}

/**
* Deserializes the error from {@code errBytes}. Should be called only once.
*
* @param errBytes Serialized error.
* @see MessageWriter
*/
public void errorBytes(@Nullable byte[] errBytes) {
if (F.isEmpty(errBytes))
err = null;
else {
try {
err = U.unmarshal(jdk(), errBytes, U.gridClassLoader());
}
catch (IgniteCheckedException e) {
throw new IgniteException("Failed to unmarshal error data bytes.", e);
}
}
/** {@inheritDoc} */
@Override public void finishUnmarshal(Marshaller marsh, ClassLoader clsLdr) throws IgniteCheckedException {
if (errBytes != null)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Usually we do && err != null. The same in the other places

err = U.unmarshal(marsh, errBytes, clsLdr);
}

/** */
Expand All @@ -125,8 +81,6 @@ public void errorBytes(@Nullable byte[] errBytes) {
}

/**
* Safely gets original error from an error message.
*
* @param errorMsg Error message.
* @return Error containing in the message.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -449,7 +449,7 @@ public void resetMetrics() {

List<MessageFactoryProvider> compMsgs = new ArrayList<>();

compMsgs.add(new GridIoMessageFactory());
compMsgs.add(new GridIoMessageFactory(marsh, U.gridClassLoader()));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not ctx.marshallerContext().jdkMarshaller() ? Is widely used.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Has no answer, unfortunatelly


for (IgniteComponentType compType : IgniteComponentType.values()) {
MessageFactoryProvider f = compType.messageFactory();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,7 @@
import org.apache.ignite.internal.util.UUIDCollectionMessage;
import org.apache.ignite.internal.util.UUIDCollectionMessageSerializer;
import org.apache.ignite.internal.util.distributed.SingleNodeMessage;
import org.apache.ignite.marshaller.Marshaller;
import org.apache.ignite.plugin.extensions.communication.MessageFactory;
import org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider;
import org.apache.ignite.spi.collision.jobstealing.JobStealingRequest;
Expand All @@ -350,12 +351,27 @@
* Message factory implementation.
*/
public class GridIoMessageFactory implements MessageFactoryProvider {
/** Custom data marshaller. */
private final Marshaller cstDataMarshall;

/** Class loader for the custom data marshalling. */
private final ClassLoader cstDataMarshallClsLdr;

/**
* @param cstDataMarshall Custom data marshaller.
* @param cstDataMarshallClsLdr Class loader for the custom data marshalling.
*/
public GridIoMessageFactory(Marshaller cstDataMarshall, ClassLoader cstDataMarshallClsLdr) {
this.cstDataMarshall = cstDataMarshall;
this.cstDataMarshallClsLdr = cstDataMarshallClsLdr;
}

/** {@inheritDoc} */
@Override public void registerAll(MessageFactory factory) {
// -54 is reserved for SQL.
// We don't use the code‑generated serializer for CompressedMessage - serialization is highly customized.
factory.register(CompressedMessage.TYPE_CODE, CompressedMessage::new);
factory.register((short)-66, ErrorMessage::new, new ErrorMessageSerializer());
factory.register((short)-66, ErrorMessage::new, new ErrorMessageMarshallableSerializer(cstDataMarshall, cstDataMarshallClsLdr));
factory.register((short)-65, TxInfo::new, new TxInfoSerializer());
factory.register((short)-64, TxEntriesInfo::new, new TxEntriesInfoSerializer());
factory.register((short)-63, ExchangeInfo::new, new ExchangeInfoSerializer());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.ignite.internal.managers.discovery;

import org.apache.ignite.internal.managers.communication.ErrorMessage;
import org.apache.ignite.internal.managers.communication.ErrorMessageSerializer;
import org.apache.ignite.internal.managers.communication.ErrorMessageMarshallableSerializer;
import org.apache.ignite.internal.processors.authentication.User;
import org.apache.ignite.internal.processors.authentication.UserAcceptedMessage;
import org.apache.ignite.internal.processors.authentication.UserAcceptedMessageSerializer;
Expand Down Expand Up @@ -132,23 +132,20 @@
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryServerOnlyCustomEventMessageSerializer;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryStatusCheckMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryStatusCheckMessageSerializer;
import org.jetbrains.annotations.Nullable;

/** Message factory for discovery messages. */
public class DiscoveryMessageFactory implements MessageFactoryProvider {
/** Custom data marshaller. */
private final @Nullable Marshaller cstDataMarshall;
private final Marshaller cstDataMarshall;

/** Class loader for the custom data marshalling. */
private final @Nullable ClassLoader cstDataMarshallClsLdr;
private final ClassLoader cstDataMarshallClsLdr;

/**
* @param cstDataMarshall Custom data marshaller.
* @param cstDataMarshallClsLdr Class loader for the custom data marshalling.
*/
public DiscoveryMessageFactory(@Nullable Marshaller cstDataMarshall, @Nullable ClassLoader cstDataMarshallClsLdr) {
assert cstDataMarshall == null && cstDataMarshallClsLdr == null || cstDataMarshall != null && cstDataMarshallClsLdr != null;

public DiscoveryMessageFactory(Marshaller cstDataMarshall, ClassLoader cstDataMarshallClsLdr) {
this.cstDataMarshall = cstDataMarshall;
this.cstDataMarshallClsLdr = cstDataMarshallClsLdr;
}
Expand All @@ -166,7 +163,7 @@ public DiscoveryMessageFactory(@Nullable Marshaller cstDataMarshall, @Nullable C
factory.register((short)-102, TcpDiscoveryNodeMetricsMessage::new, new TcpDiscoveryNodeMetricsMessageSerializer());
factory.register((short)-101, InetSocketAddressMessage::new, new InetSocketAddressMessageSerializer());
factory.register((short)-100, InetAddressMessage::new, new InetAddressMessageSerializer());
factory.register((short)-66, ErrorMessage::new, new ErrorMessageSerializer());
factory.register((short)-66, ErrorMessage::new, new ErrorMessageMarshallableSerializer(cstDataMarshall, cstDataMarshallClsLdr));

// TcpDiscoveryAbstractMessage
factory.register((short)0, TcpDiscoveryCheckFailedMessage::new, new TcpDiscoveryCheckFailedMessageSerializer());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,21 @@

package org.apache.ignite.plugin.extensions.communication;

import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.marshaller.Marshaller;

/** A {@link Message} which still requires external custom pre-marshalling and post-unmarshalling. */
public interface MarshallableMessage extends Message {
/** @param marsh External custom marshaller. */
public default void prepareMarshal(Marshaller marsh) {
public default void prepareMarshal(Marshaller marsh) throws IgniteCheckedException {
throw new UnsupportedOperationException();
}

/**
* @param marsh External custom marshaller.
* @param clsLdr External class loader to post-unmarshall.
*/
public default void finishUnmarshal(Marshaller marsh, ClassLoader clsLdr) {
public default void finishUnmarshal(Marshaller marsh, ClassLoader clsLdr) throws IgniteCheckedException {
throw new UnsupportedOperationException();
}
}
Loading
Loading