Commit 40d06b8c authored by Oleg Zhurakousky's avatar Oleg Zhurakousky
Browse files

GH-422 GH-606 Add support for normalizing structure-mode CE message

Normalizing in this context means converting it to binary-mode so the rest of the processing logic is the same.
Added support for canonical attribute names. Now, internally any attribute can be set as 'ce_' regardless where it came from are where it goes to as the frameork will be able to recognize both
Removed CloudEventMessageConverter
Renamed CloudEventAttributes to CloudEventAttributesHelperas it is better suited to what it actually does
Showing with 325 additions and 278 deletions
+325 -278
......@@ -19,13 +19,15 @@ package org.springframework.cloud.function.cloudevent;
import java.util.HashMap;
import java.util.Map;
import org.springframework.util.StringUtils;
/**
*
* @author Oleg Zhurakousky
* @since 3.1
*/
public class CloudEventAttributes extends HashMap<String, Object> {
public class CloudEventAttributesHelper extends HashMap<String, Object> {
/**
*
......@@ -34,51 +36,90 @@ public class CloudEventAttributes extends HashMap<String, Object> {
CloudEventAttributes(Map<String, Object> headers) {
CloudEventAttributesHelper(Map<String, Object> headers) {
super(headers);
}
@SuppressWarnings("unchecked")
public <A> A getId() {
return this.containsKey(CloudEventMessageUtils.CE_ID)
? (A) this.get(CloudEventMessageUtils.CE_ID)
: (A) this.get(CloudEventMessageUtils.ID);
if (this.containsKey(CloudEventMessageUtils.CANONICAL_ID)) {
return (A) this.get(CloudEventMessageUtils.CANONICAL_ID);
}
else if (this.containsKey(CloudEventMessageUtils.HTTP_ATTR_PREFIX + CloudEventMessageUtils.ID)) {
return (A) this.get(CloudEventMessageUtils.HTTP_ATTR_PREFIX + CloudEventMessageUtils.ID);
}
return null;
}
String getAttributeName(String attributeName) {
if (this.containsKey(CloudEventMessageUtils.ATTR_PREFIX + attributeName)) {
return CloudEventMessageUtils.ATTR_PREFIX + attributeName;
}
else if (this.containsKey(CloudEventMessageUtils.HTTP_ATTR_PREFIX + attributeName)) {
return CloudEventMessageUtils.HTTP_ATTR_PREFIX + attributeName;
}
return attributeName;
}
@SuppressWarnings("unchecked")
public <A> A getSource() {
return this.containsKey(CloudEventMessageUtils.CE_SOURCE)
? (A) this.get(CloudEventMessageUtils.CE_SOURCE)
: (A) this.get(CloudEventMessageUtils.SOURCE);
if (this.containsKey(CloudEventMessageUtils.CANONICAL_SOURCE)) {
return (A) this.get(CloudEventMessageUtils.CANONICAL_SOURCE);
}
else if (this.containsKey(CloudEventMessageUtils.HTTP_ATTR_PREFIX + CloudEventMessageUtils.SOURCE)) {
return (A) this.get(CloudEventMessageUtils.HTTP_ATTR_PREFIX + CloudEventMessageUtils.SOURCE);
}
return (A) this.get(CloudEventMessageUtils.SOURCE);
}
@SuppressWarnings("unchecked")
public <A> A getSpecversion() {
return this.containsKey(CloudEventMessageUtils.CE_SPECVERSION)
? (A) this.get(CloudEventMessageUtils.CE_SPECVERSION)
: (A) this.get(CloudEventMessageUtils.SPECVERSION);
if (this.containsKey(CloudEventMessageUtils.CANONICAL_SPECVERSION)) {
return (A) this.get(CloudEventMessageUtils.CANONICAL_SPECVERSION);
}
else if (this.containsKey(CloudEventMessageUtils.HTTP_ATTR_PREFIX + CloudEventMessageUtils.SPECVERSION)) {
return (A) this.get(CloudEventMessageUtils.HTTP_ATTR_PREFIX + CloudEventMessageUtils.SPECVERSION);
}
return (A) this.get(CloudEventMessageUtils.SPECVERSION);
}
@SuppressWarnings("unchecked")
public <A> A getType() {
return this.containsKey(CloudEventMessageUtils.CE_TYPE)
? (A) this.get(CloudEventMessageUtils.CE_TYPE)
: (A) this.get(CloudEventMessageUtils.TYPE);
if (this.containsKey(CloudEventMessageUtils.CANONICAL_TYPE)) {
return (A) this.get(CloudEventMessageUtils.CANONICAL_TYPE);
}
else if (this.containsKey(CloudEventMessageUtils.HTTP_ATTR_PREFIX + CloudEventMessageUtils.TYPE)) {
return (A) this.get(CloudEventMessageUtils.HTTP_ATTR_PREFIX + CloudEventMessageUtils.TYPE);
}
return (A) this.get(CloudEventMessageUtils.TYPE);
}
@SuppressWarnings("unchecked")
public <A> A getDataContentType() {
return this.containsKey(CloudEventMessageUtils.CE_DATACONTENTTYPE)
? (A) this.get(CloudEventMessageUtils.CE_DATACONTENTTYPE)
: (A) this.get(CloudEventMessageUtils.DATACONTENTTYPE);
Object dataContentType;
if (this.containsKey(CloudEventMessageUtils.CANONICAL_DATACONTENTTYPE)) {
dataContentType = this.get(CloudEventMessageUtils.CANONICAL_DATACONTENTTYPE);
}
else if (this.containsKey(CloudEventMessageUtils.HTTP_ATTR_PREFIX + CloudEventMessageUtils.DATACONTENTTYPE)) {
dataContentType = this.get(CloudEventMessageUtils.HTTP_ATTR_PREFIX + CloudEventMessageUtils.DATACONTENTTYPE);
}
dataContentType = this.get(CloudEventMessageUtils.DATACONTENTTYPE);
return (A) dataContentType;
}
public void setDataContentType(String datacontenttype) {
this.put(CloudEventMessageUtils.CE_DATACONTENTTYPE, datacontenttype);
this.put(CloudEventMessageUtils.CANONICAL_DATACONTENTTYPE, datacontenttype);
}
@SuppressWarnings("unchecked")
public <A> A getAtttribute(String name) {
return (A) this.get(name);
}
public boolean isValidCloudEvent() {
return StringUtils.hasText(this.getId())
&& StringUtils.hasText(this.getSource())
&& StringUtils.hasText(this.getSpecversion())
&& StringUtils.hasText(this.getType());
}
}
......@@ -31,7 +31,7 @@ public interface CloudEventAttributesProvider {
*
* @param inputMessage input message used to invoke user functionality (e.g., function)
* @param result result of the invocation of user functionality (e.g., function)
* @return instance of {@link CloudEventAttributes}
* @return instance of {@link CloudEventAttributesHelper}
*/
Map<String, Object> generateDefaultCloudEventHeaders(Message<?> inputMessage, Object result);
}
/*
* Copyright 2020-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.function.cloudevent;
import java.util.Map;
import java.util.function.Function;
import org.springframework.cloud.function.context.config.SmartCompositeMessageConverter;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.converter.CompositeMessageConverter;
import org.springframework.messaging.converter.ContentTypeResolver;
import org.springframework.messaging.converter.DefaultContentTypeResolver;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.Assert;
import org.springframework.util.MimeType;
import org.springframework.util.MimeTypeUtils;
/**
* A Cloud Events specific pre-processor that is added to {@link SmartCompositeMessageConverter}
* to potentially modify incoming message.
* <br><br>
* For Cloud Event coming in binary-mode such modification implies determining
* content type of the 'data' attribute (see {@link #getDataContentType(MessageHeaders)}
* of Cloud Event and creating a new {@link Message} with its `contentType` set to such
* content type while copying the rest of the headers.
* <br><br>
* Similar to Cloud Event coming in binary-mode, the Cloud Event coming in structured-mode
* such modification also implies determining content type of the 'data' attribute
* (see {@link #getDataContentType(MessageHeaders)}...
*
* @author Oleg Zhurakousky
* @since 3.1
*/
public class CloudEventDataContentTypeMessagePreProcessor implements Function<Message<?>, Message<?>> {
private final ContentTypeResolver contentTypeResolver = new DefaultContentTypeResolver();
private final MimeType cloudEventContentType = CloudEventMessageUtils.APPLICATION_CLOUDEVENTS;
private final CompositeMessageConverter messageConverter;
public CloudEventDataContentTypeMessagePreProcessor(CompositeMessageConverter messageConverter) {
Assert.notNull(messageConverter, "'messageConverter' must not be null");
this.messageConverter = messageConverter;
}
@SuppressWarnings("unchecked")
@Override
public Message<?> apply(Message<?> inputMessage) {
if (CloudEventMessageUtils.isBinary(inputMessage.getHeaders())) {
String dataContentType = this.getDataContentType(inputMessage.getHeaders());
Message<?> message = MessageBuilder.fromMessage(inputMessage)
.setHeader(MessageHeaders.CONTENT_TYPE, dataContentType)
// .setHeader("originalContentType", inputMessage.getHeaders().get(MessageHeaders.CONTENT_TYPE)) not sure about it
.build();
return message;
}
else if (this.isStructured(inputMessage)) {
MimeType contentType = this.contentTypeResolver.resolve(inputMessage.getHeaders());
String dataContentType = this.getDataContentType(inputMessage.getHeaders());
String suffix = contentType.getSubtypeSuffix();
MimeType cloudEventDeserializationContentType = MimeTypeUtils
.parseMimeType(contentType.getType() + "/" + suffix);
Message<?> cloudEventMessage = MessageBuilder.fromMessage(inputMessage)
.setHeader(MessageHeaders.CONTENT_TYPE, cloudEventDeserializationContentType)
.setHeader(CloudEventMessageUtils.CE_DATACONTENTTYPE, dataContentType).build();
Map<String, Object> structuredCloudEvent = (Map<String, Object>) this.messageConverter
.fromMessage(cloudEventMessage, Map.class);
Message<?> binaryCeMessage = this.buildCeMessageFromStructured(structuredCloudEvent);
return binaryCeMessage;
}
else {
return inputMessage;
}
}
private Message<?> buildCeMessageFromStructured(Map<String, Object> structuredCloudEvent) {
MessageBuilder<?> builder = MessageBuilder.withPayload(
structuredCloudEvent.containsKey(CloudEventMessageUtils.CE_DATA)
? structuredCloudEvent.get(CloudEventMessageUtils.CE_DATA)
: structuredCloudEvent.get(CloudEventMessageUtils.DATA));
structuredCloudEvent.remove(CloudEventMessageUtils.CE_DATA);
structuredCloudEvent.remove(CloudEventMessageUtils.DATA);
builder.copyHeaders(structuredCloudEvent);
return builder.build();
}
private String getDataContentType(MessageHeaders headers) {
if (headers.containsKey(CloudEventMessageUtils.DATACONTENTTYPE)) {
return (String) headers.get(CloudEventMessageUtils.DATACONTENTTYPE);
}
else if (headers.containsKey(CloudEventMessageUtils.CE_DATACONTENTTYPE)) {
return (String) headers.get(CloudEventMessageUtils.CE_DATACONTENTTYPE);
}
else if (headers.containsKey(MessageHeaders.CONTENT_TYPE)) {
return headers.get(MessageHeaders.CONTENT_TYPE).toString();
}
return MimeTypeUtils.APPLICATION_JSON_VALUE;
}
private boolean isStructured(Message<?> message) {
if (!CloudEventMessageUtils.isBinary(message.getHeaders())) {
Map<String, Object> headers = message.getHeaders();
if (headers.containsKey(MessageHeaders.CONTENT_TYPE)) {
MimeType contentType = this.contentTypeResolver.resolve(message.getHeaders());
if (contentType.getType().equals(this.cloudEventContentType.getType())
&& contentType.getSubtype().startsWith(this.cloudEventContentType.getSubtype())) {
return true;
}
}
}
return false;
}
}
/*
* Copyright 2020-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.function.cloudevent;
import org.springframework.cloud.function.context.config.JsonMessageConverter;
import org.springframework.cloud.function.json.JsonMapper;
import org.springframework.messaging.converter.MessageConverter;
import org.springframework.util.MimeType;
/**
* Implementation of {@link MessageConverter} which uses Jackson or Gson libraries to do the
* actual conversion via {@link JsonMapper} instance.
*
* @author Oleg Zhurakousky
*
* @since 3.1
*/
public class CloudEventJsonMessageConverter extends JsonMessageConverter {
public CloudEventJsonMessageConverter(JsonMapper jsonMapper) {
super(jsonMapper, new MimeType(CloudEventMessageUtils.APPLICATION_CLOUDEVENTS.getType(),
CloudEventMessageUtils.APPLICATION_CLOUDEVENTS.getSubtype() + "+json"));
this.setStrictContentTypeMatch(true);
}
}
......@@ -18,13 +18,19 @@ package org.springframework.cloud.function.cloudevent;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.converter.ContentTypeResolver;
import org.springframework.messaging.converter.DefaultContentTypeResolver;
import org.springframework.messaging.converter.MessageConverter;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.Assert;
import org.springframework.util.MimeType;
import org.springframework.util.MimeTypeUtils;
import org.springframework.util.StringUtils;
/**
* Miscellaneous utility methods to deal with Cloud Events - https://cloudevents.io/.
......@@ -36,6 +42,8 @@ import org.springframework.util.MimeTypeUtils;
*/
public final class CloudEventMessageUtils {
private static final ContentTypeResolver contentTypeResolver = new DefaultContentTypeResolver();
private CloudEventMessageUtils() {
}
......@@ -53,7 +61,12 @@ public final class CloudEventMessageUtils {
/**
* Prefix for attributes.
*/
public static String ATTR_PREFIX = "ce-";
public static String ATTR_PREFIX = "ce_";
/**
* Prefix for attributes.
*/
public static String HTTP_ATTR_PREFIX = "ce-";
/**
* Value for 'data' attribute.
......@@ -63,7 +76,7 @@ public final class CloudEventMessageUtils {
/**
* Value for 'data' attribute with prefix.
*/
public static String CE_DATA = ATTR_PREFIX + DATA;
public static String CANONICAL_DATA = ATTR_PREFIX + DATA;
/**
* Value for 'id' attribute.
......@@ -73,7 +86,7 @@ public final class CloudEventMessageUtils {
/**
* Value for 'id' attribute with prefix.
*/
public static String CE_ID = ATTR_PREFIX + ID;
public static String CANONICAL_ID = ATTR_PREFIX + ID;
/**
* Value for 'source' attribute.
......@@ -83,7 +96,7 @@ public final class CloudEventMessageUtils {
/**
* Value for 'source' attribute with prefix.
*/
public static String CE_SOURCE = ATTR_PREFIX + SOURCE;
public static String CANONICAL_SOURCE = ATTR_PREFIX + SOURCE;
/**
* Value for 'specversion' attribute.
......@@ -93,7 +106,7 @@ public final class CloudEventMessageUtils {
/**
* Value for 'specversion' attribute with prefix.
*/
public static String CE_SPECVERSION = ATTR_PREFIX + SPECVERSION;
public static String CANONICAL_SPECVERSION = ATTR_PREFIX + SPECVERSION;
/**
* Value for 'type' attribute.
......@@ -103,7 +116,7 @@ public final class CloudEventMessageUtils {
/**
* Value for 'type' attribute with prefix.
*/
public static String CE_TYPE = ATTR_PREFIX + TYPE;
public static String CANONICAL_TYPE = ATTR_PREFIX + TYPE;
/**
* Value for 'datacontenttype' attribute.
......@@ -113,7 +126,7 @@ public final class CloudEventMessageUtils {
/**
* Value for 'datacontenttype' attribute with prefix.
*/
public static String CE_DATACONTENTTYPE = ATTR_PREFIX + DATACONTENTTYPE;
public static String CANONICAL_DATACONTENTTYPE = ATTR_PREFIX + DATACONTENTTYPE;
/**
* Value for 'dataschema' attribute.
......@@ -123,7 +136,7 @@ public final class CloudEventMessageUtils {
/**
* Value for 'dataschema' attribute with prefix.
*/
public static String CE_DATASCHEMA = ATTR_PREFIX + DATASCHEMA;
public static String CANONICAL_DATASCHEMA = ATTR_PREFIX + DATASCHEMA;
/**
* Value for 'subject' attribute.
......@@ -133,7 +146,7 @@ public final class CloudEventMessageUtils {
/**
* Value for 'subject' attribute with prefix.
*/
public static String CE_SUBJECT = ATTR_PREFIX + SUBJECT;
public static String CANONICAL_SUBJECT = ATTR_PREFIX + SUBJECT;
/**
* Value for 'time' attribute.
......@@ -143,68 +156,130 @@ public final class CloudEventMessageUtils {
/**
* Value for 'time' attribute with prefix.
*/
public static String CE_TIME = ATTR_PREFIX + TIME;
public static String CANONICAL_TIME = ATTR_PREFIX + TIME;
/**
* Checks if {@link Message} represents cloud event in binary-mode.
*/
public static boolean isBinary(Map<String, Object> headers) {
return (headers.containsKey(ID)
&& headers.containsKey(SOURCE)
&& headers.containsKey(SPECVERSION)
&& headers.containsKey(TYPE))
||
(headers.containsKey(CE_ID)
&& headers.containsKey(CE_SOURCE)
&& headers.containsKey(CE_SPECVERSION)
&& headers.containsKey(CE_TYPE));
CloudEventAttributesHelper attributes = new CloudEventAttributesHelper(headers);
return attributes.isValidCloudEvent();
}
/**
* Will construct instance of {@link CloudEventAttributes} setting its required attributes.
* Will construct instance of {@link CloudEventAttributesHelper} setting its required attributes.
*
* @param ce_id value for Cloud Event 'id' attribute
* @param ce_specversion value for Cloud Event 'specversion' attribute
* @param ce_source value for Cloud Event 'source' attribute
* @param ce_type value for Cloud Event 'type' attribute
* @return instance of {@link CloudEventAttributes}
* @return instance of {@link CloudEventAttributesHelper}
*/
public static CloudEventAttributes get(String ce_id, String ce_specversion, String ce_source, String ce_type) {
public static CloudEventAttributesHelper get(String ce_id, String ce_specversion, String ce_source, String ce_type) {
Assert.hasText(ce_id, "'ce_id' must not be null or empty");
Assert.hasText(ce_specversion, "'ce_specversion' must not be null or empty");
Assert.hasText(ce_source, "'ce_source' must not be null or empty");
Assert.hasText(ce_type, "'ce_type' must not be null or empty");
Map<String, Object> requiredAttributes = new HashMap<>();
requiredAttributes.put(CloudEventMessageUtils.CE_ID, ce_id);
requiredAttributes.put(CloudEventMessageUtils.CE_SPECVERSION, ce_specversion);
requiredAttributes.put(CloudEventMessageUtils.CE_SOURCE, ce_source);
requiredAttributes.put(CloudEventMessageUtils.CE_TYPE, ce_type);
return new CloudEventAttributes(requiredAttributes);
requiredAttributes.put(CloudEventMessageUtils.CANONICAL_ID, ce_id);
requiredAttributes.put(CloudEventMessageUtils.CANONICAL_SPECVERSION, ce_specversion);
requiredAttributes.put(CloudEventMessageUtils.CANONICAL_SOURCE, ce_source);
requiredAttributes.put(CloudEventMessageUtils.CANONICAL_TYPE, ce_type);
return new CloudEventAttributesHelper(requiredAttributes);
}
/**
* Will construct instance of {@link CloudEventAttributes}
* Will construct instance of {@link CloudEventAttributesHelper}
* Should default/generate cloud event ID and SPECVERSION.
*
* @param ce_source value for Cloud Event 'source' attribute
* @param ce_type value for Cloud Event 'type' attribute
* @return instance of {@link CloudEventAttributes}
* @return instance of {@link CloudEventAttributesHelper}
*/
public static CloudEventAttributes get(String ce_source, String ce_type) {
public static CloudEventAttributesHelper get(String ce_source, String ce_type) {
return get(UUID.randomUUID().toString(), "1.0", ce_source, ce_type);
}
/**
* Will construct instance of {@link CloudEventAttributes} from {@link MessageHeaders}.
* Will construct instance of {@link CloudEventAttributesHelper} from {@link MessageHeaders}.
*
* Should copy Cloud Event related headers into an instance of {@link CloudEventAttributes}
* Should copy Cloud Event related headers into an instance of {@link CloudEventAttributesHelper}
* NOTE: Certain headers must not be copied.
*
* @param headers instance of {@link MessageHeaders}
* @return modifiable instance of {@link CloudEventAttributes}
* @return modifiable instance of {@link CloudEventAttributesHelper}
*/
public static RequiredAttributeAccessor get(MessageHeaders headers) {
return new RequiredAttributeAccessor(headers);
}
@SuppressWarnings("unchecked")
public static Message<?> toBinary(Message<?> inputMessage, MessageConverter messageConverter) {
Map<String, Object> headers = inputMessage.getHeaders();
CloudEventAttributesHelper attributes = new CloudEventAttributesHelper(headers);
// first check the obvious and see if content-type is `cloudevents`
if (!attributes.isValidCloudEvent() && headers.containsKey(MessageHeaders.CONTENT_TYPE)) {
MimeType contentType = contentTypeResolver.resolve(inputMessage.getHeaders());
if (contentType.getType().equals(CloudEventMessageUtils.APPLICATION_CLOUDEVENTS.getType())
&& contentType.getSubtype().startsWith(CloudEventMessageUtils.APPLICATION_CLOUDEVENTS.getSubtype())) {
String dataContentType = StringUtils.hasText(attributes.getDataContentType())
? attributes.getDataContentType()
: MimeTypeUtils.APPLICATION_JSON_VALUE;
String suffix = contentType.getSubtypeSuffix();
MimeType cloudEventDeserializationContentType = MimeTypeUtils
.parseMimeType(contentType.getType() + "/" + suffix);
Message<?> cloudEventMessage = MessageBuilder.fromMessage(inputMessage)
.setHeader(MessageHeaders.CONTENT_TYPE, cloudEventDeserializationContentType)
.setHeader(CloudEventMessageUtils.CANONICAL_DATACONTENTTYPE, dataContentType).build();
Map<String, Object> structuredCloudEvent = (Map<String, Object>) messageConverter.fromMessage(cloudEventMessage, Map.class);
Message<?> binaryCeMessage = buildCeMessageFromStructured(structuredCloudEvent, determinePrefixToUse(inputMessage));
return binaryCeMessage;
}
}
else if (StringUtils.hasText(attributes.getDataContentType())) {
return MessageBuilder.fromMessage(inputMessage)
.setHeader(MessageHeaders.CONTENT_TYPE, attributes.getDataContentType())
.build();
}
return inputMessage;
}
private static Message<?> buildCeMessageFromStructured(Map<String, Object> structuredCloudEvent, String prefixToUse) {
Object data = null;
if (structuredCloudEvent.containsKey(CloudEventMessageUtils.HTTP_ATTR_PREFIX + CloudEventMessageUtils.DATA)) {
data = structuredCloudEvent.get(CloudEventMessageUtils.HTTP_ATTR_PREFIX + CloudEventMessageUtils.DATA);
structuredCloudEvent.remove(CloudEventMessageUtils.HTTP_ATTR_PREFIX + CloudEventMessageUtils.DATA);
}
else if (structuredCloudEvent.containsKey(CloudEventMessageUtils.CANONICAL_DATA)) {
data = structuredCloudEvent.get(CloudEventMessageUtils.CANONICAL_DATA);
structuredCloudEvent.remove(CloudEventMessageUtils.CANONICAL_DATA);
}
else if (structuredCloudEvent.containsKey(CloudEventMessageUtils.DATA)) {
data = structuredCloudEvent.get(CloudEventMessageUtils.DATA);
structuredCloudEvent.remove(CloudEventMessageUtils.DATA);
}
Assert.notNull(data, "'data' must not be null");
MessageBuilder<?> builder = MessageBuilder.withPayload(data);
CloudEventAttributesHelper attributes = new CloudEventAttributesHelper(structuredCloudEvent);
builder.setHeader(prefixToUse + CloudEventMessageUtils.ID, attributes.getId());
builder.setHeader(prefixToUse + CloudEventMessageUtils.SOURCE, attributes.getSource());
builder.setHeader(prefixToUse + CloudEventMessageUtils.TYPE, attributes.getType());
builder.setHeader(prefixToUse + CloudEventMessageUtils.SPECVERSION, attributes.getSpecversion());
return builder.build();
}
public static String determinePrefixToUse(Message<?> inputMessage) {
Set<String> keys = inputMessage.getHeaders().keySet();
if (keys.contains("user-agent")) {
return CloudEventMessageUtils.HTTP_ATTR_PREFIX;
}
else {
return CloudEventMessageUtils.ATTR_PREFIX;
}
}
}
......@@ -41,9 +41,10 @@ public class DefaultCloudEventAttributesProvider implements CloudEventAttributes
@Override
public Map<String, Object> generateDefaultCloudEventHeaders(Message<?> inputMessage, Object result) {
if (inputMessage.getHeaders().containsKey(CloudEventMessageUtils.CE_ID)) { // input is a cloud event
RequiredAttributeAccessor attributes = new RequiredAttributeAccessor(inputMessage.getHeaders(), CloudEventMessageUtils.determinePrefixToUse(inputMessage));
if (attributes.isValidCloudEvent()) {
String applicationName = this.getApplicationName();
return CloudEventMessageUtils.get(inputMessage.getHeaders())
return attributes
.setId(UUID.randomUUID().toString())
.setType(result.getClass().getName())
.setSource(applicationName);
......
......@@ -18,39 +18,68 @@ package org.springframework.cloud.function.cloudevent;
import java.util.Map;
import org.springframework.util.StringUtils;
/**
*
* @author Oleg Zhurakousky
* @since 3.1
*/
public class RequiredAttributeAccessor extends CloudEventAttributes {
public class RequiredAttributeAccessor extends CloudEventAttributesHelper {
private final String prefixToUse;
/**
*
*/
private static final long serialVersionUID = 859410409447601477L;
RequiredAttributeAccessor(Map<String, Object> headers) {
RequiredAttributeAccessor(Map<String, Object> headers, String prefixToUse) {
super(headers);
this.prefixToUse = prefixToUse;
}
RequiredAttributeAccessor(Map<String, Object> headers) {
this(headers, null);
}
public RequiredAttributeAccessor setId(String id) {
this.put(CloudEventMessageUtils.CE_ID, id);
if (StringUtils.hasText(this.prefixToUse)) {
this.put(this.prefixToUse + CloudEventMessageUtils.ID, id);
}
else {
this.put(this.getAttributeName(CloudEventMessageUtils.ID), id);
}
return this;
}
public RequiredAttributeAccessor setSource(String source) {
this.put(CloudEventMessageUtils.CE_SOURCE, source);
if (StringUtils.hasText(this.prefixToUse)) {
this.put(this.prefixToUse + CloudEventMessageUtils.SOURCE, source);
}
else {
this.put(this.getAttributeName(CloudEventMessageUtils.SOURCE), source);
}
return this;
}
public RequiredAttributeAccessor setSpecversion(String specversion) {
this.put(CloudEventMessageUtils.CE_SPECVERSION, specversion);
if (StringUtils.hasText(this.prefixToUse)) {
this.put(this.prefixToUse + CloudEventMessageUtils.SPECVERSION, specversion);
}
else {
this.put(this.getAttributeName(CloudEventMessageUtils.SPECVERSION), specversion);
}
return this;
}
public RequiredAttributeAccessor setType(String type) {
this.put(CloudEventMessageUtils.CE_TYPE, type);
if (StringUtils.hasText(this.prefixToUse)) {
this.put(this.prefixToUse + CloudEventMessageUtils.TYPE, type);
}
else {
this.put(this.getAttributeName(CloudEventMessageUtils.TYPE), type);
}
return this;
}
}
......@@ -47,6 +47,7 @@ import reactor.util.function.Tuples;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.function.cloudevent.CloudEventMessageUtils;
import org.springframework.cloud.function.context.FunctionCatalog;
import org.springframework.cloud.function.context.FunctionProperties;
import org.springframework.cloud.function.context.FunctionRegistration;
......@@ -821,6 +822,9 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect
&& !this.isInputTypeMessage()) { //TODO rework
return null;
}
input = CloudEventMessageUtils.toBinary((Message<?>) input, messageConverter);
convertedInput = this.convertInputMessageIfNecessary((Message) input, type);
if (convertedInput == null) { // give ConversionService a chance
convertedInput = this.convertNonMessageInputIfNecessary(type, ((Message) input).getPayload(), false);
......
......@@ -31,8 +31,6 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.cloud.function.cloudevent.CloudEventAttributesProvider;
import org.springframework.cloud.function.cloudevent.CloudEventDataContentTypeMessagePreProcessor;
import org.springframework.cloud.function.cloudevent.CloudEventJsonMessageConverter;
import org.springframework.cloud.function.cloudevent.DefaultCloudEventAttributesProvider;
import org.springframework.cloud.function.context.FunctionCatalog;
import org.springframework.cloud.function.context.FunctionProperties;
......@@ -107,15 +105,12 @@ public class ContextFunctionCatalogAutoConfiguration {
.collect(Collectors.toList());
mcList.add(new JsonMessageConverter(jsonMapper));
mcList.add(new CloudEventJsonMessageConverter(jsonMapper));
mcList.add(new ByteArrayMessageConverter());
mcList.add(new StringMessageConverter());
mcList.add(new PrimitiveTypesFromStringMessageConverter(conversionService));
if (!CollectionUtils.isEmpty(mcList)) {
messageConverter = new SmartCompositeMessageConverter(mcList);
CloudEventDataContentTypeMessagePreProcessor messagePreProcessor = new CloudEventDataContentTypeMessagePreProcessor(messageConverter);
messageConverter.setMessagePreProcessor(messagePreProcessor);
}
return new BeanFactoryAwareFunctionRegistry(conversionService, messageConverter, jsonMapper);
......
......@@ -20,6 +20,7 @@ import java.lang.reflect.Type;
import java.nio.charset.StandardCharsets;
import java.util.Collection;
import org.springframework.cloud.function.cloudevent.CloudEventMessageUtils;
import org.springframework.cloud.function.json.JsonMapper;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
......@@ -42,7 +43,8 @@ public class JsonMessageConverter extends AbstractMessageConverter {
private final JsonMapper jsonMapper;
public JsonMessageConverter(JsonMapper jsonMapper) {
this(jsonMapper, new MimeType("application", "json"));
this(jsonMapper, new MimeType("application", "json"), new MimeType(CloudEventMessageUtils.APPLICATION_CLOUDEVENTS.getType(),
CloudEventMessageUtils.APPLICATION_CLOUDEVENTS.getSubtype() + "+json"));
}
public JsonMessageConverter(JsonMapper jsonMapper, MimeType... supportedMimeTypes) {
......
......@@ -18,7 +18,6 @@ package org.springframework.cloud.function.context.config;
import java.util.Collection;
import java.util.List;
import java.util.function.Function;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
......@@ -38,8 +37,6 @@ import org.springframework.util.StringUtils;
*/
public class SmartCompositeMessageConverter extends CompositeMessageConverter {
private Function<Message<?>, Message<?>> preProcessor;
public SmartCompositeMessageConverter(Collection<MessageConverter> converters) {
super(converters);
}
......@@ -47,9 +44,6 @@ public class SmartCompositeMessageConverter extends CompositeMessageConverter {
@Override
@Nullable
public Object fromMessage(Message<?> message, Class<?> targetClass) {
if (this.preProcessor != null) {
message = this.preProcessor.apply(message);
}
for (MessageConverter converter : getConverters()) {
Object result = converter.fromMessage(message, targetClass);
if (result != null) {
......@@ -62,9 +56,6 @@ public class SmartCompositeMessageConverter extends CompositeMessageConverter {
@Override
@Nullable
public Object fromMessage(Message<?> message, Class<?> targetClass, @Nullable Object conversionHint) {
if (this.preProcessor != null) {
message = this.preProcessor.apply(message);
}
for (MessageConverter converter : getConverters()) {
Object result = (converter instanceof SmartMessageConverter ?
((SmartMessageConverter) converter).fromMessage(message, targetClass, conversionHint) :
......@@ -76,7 +67,6 @@ public class SmartCompositeMessageConverter extends CompositeMessageConverter {
return null;
}
@SuppressWarnings("unchecked")
@Override
@Nullable
public Message<?> toMessage(Object payload, @Nullable MessageHeaders headers) {
......@@ -142,8 +132,4 @@ public class SmartCompositeMessageConverter extends CompositeMessageConverter {
}
return null;
}
public void setMessagePreProcessor(Function<Message<?>, Message<?>> preProcessor) {
this.preProcessor = preProcessor;
}
}
......@@ -45,7 +45,7 @@ public class CloudEventTypeConversionTests {
@Test
public void testFromMessageBinaryPayloadMatchesType() {
SmartCompositeMessageConverter messageConverter = this.configure(DummyConfiguration.class);
CloudEventAttributes ceAttributes = CloudEventMessageUtils
CloudEventAttributesHelper ceAttributes = CloudEventMessageUtils
.get(UUID.randomUUID().toString(), "1.0", "https://spring.io/", "org.springframework");
ceAttributes.setDataContentType("text/plain");
Message<String> message = MessageBuilder.withPayload("Hello Ricky").copyHeaders(ceAttributes).build();
......@@ -57,7 +57,7 @@ public class CloudEventTypeConversionTests {
@Test
public void testFromMessageBinaryPayloadDoesNotMatchType() {
SmartCompositeMessageConverter messageConverter = this.configure(DummyConfiguration.class);
CloudEventAttributes ceAttributes = CloudEventMessageUtils
CloudEventAttributesHelper ceAttributes = CloudEventMessageUtils
.get(UUID.randomUUID().toString(), "1.0", "https://spring.io/", "org.springframework");
Message<byte[]> message = MessageBuilder.withPayload("Hello Ricky".getBytes())
.copyHeaders(ceAttributes)
......@@ -71,7 +71,7 @@ public class CloudEventTypeConversionTests {
@Test // JsonMessageConverter does some special things between byte[] and String so this works
public void testFromMessageBinaryPayloadNoDataContentTypeToString() {
SmartCompositeMessageConverter messageConverter = this.configure(DummyConfiguration.class);
CloudEventAttributes ceAttributes = CloudEventMessageUtils
CloudEventAttributesHelper ceAttributes = CloudEventMessageUtils
.get(UUID.randomUUID().toString(), "1.0", "https://spring.io/", "org.springframework");
Message<byte[]> message = MessageBuilder.withPayload("Hello Ricky".getBytes())
.copyHeaders(ceAttributes)
......@@ -85,7 +85,7 @@ public class CloudEventTypeConversionTests {
@Test // Unlike the previous test the type here is POJO so no special treatement
public void testFromMessageBinaryPayloadNoDataContentTypeToPOJO() {
SmartCompositeMessageConverter messageConverter = this.configure(DummyConfiguration.class);
CloudEventAttributes ceAttributes = CloudEventMessageUtils.get("https://spring.io/", "org.springframework");
CloudEventAttributesHelper ceAttributes = CloudEventMessageUtils.get("https://spring.io/", "org.springframework");
Message<byte[]> message = MessageBuilder.withPayload("Hello Ricky".getBytes())
.copyHeaders(ceAttributes)
.setHeader(MessageHeaders.CONTENT_TYPE,
......@@ -98,7 +98,7 @@ public class CloudEventTypeConversionTests {
@Test // will fall on default CT which is json
public void testFromMessageBinaryPayloadNoDataContentTypeToPOJOThatWorks() {
SmartCompositeMessageConverter messageConverter = this.configure(DummyConfiguration.class);
CloudEventAttributes ceAttributes = CloudEventMessageUtils.get("https://spring.io/", "org.springframework");
CloudEventAttributesHelper ceAttributes = CloudEventMessageUtils.get("https://spring.io/", "org.springframework");
Message<byte[]> message = MessageBuilder.withPayload("{\"name\":\"Ricky\"}".getBytes())
.copyHeaders(ceAttributes)
.setHeader(MessageHeaders.CONTENT_TYPE,
......@@ -108,30 +108,6 @@ public class CloudEventTypeConversionTests {
assertThat(converted.getName()).isEqualTo("Ricky");
}
@Test // will fall on default CT which is json
public void testFromMessageStructured() {
String cloudEventStructured = "{\n" +
" \"specversion\" : \"1.0\",\n" +
" \"type\" : \"org.springframework\",\n" +
" \"source\" : \"https://spring.io/\",\n" +
" \"id\" : \"A234-1234-1234\",\n" +
" \"datacontenttype\" : \"application/json\",\n" +
" \"data\" : {\n" +
" \"version\" : \"1.0\",\n" +
" \"releaseName\" : \"Spring Framework\",\n" +
" \"releaseDate\" : \"24-03-2004\"\n" +
" }\n" +
" }";
SmartCompositeMessageConverter messageConverter = this.configure(DummyConfiguration.class);
Message<String> message = MessageBuilder.withPayload(cloudEventStructured)
.setHeader(MessageHeaders.CONTENT_TYPE, CloudEventMessageUtils.APPLICATION_CLOUDEVENTS_VALUE + "+json")
.setHeader(CloudEventMessageUtils.CE_DATACONTENTTYPE, MimeTypeUtils.APPLICATION_JSON_VALUE).build();
SpringReleaseEvent springReleaseEvent = (SpringReleaseEvent) messageConverter.fromMessage(message,
SpringReleaseEvent.class);
assertThat(springReleaseEvent.getReleaseName()).isEqualTo("Spring Framework");
assertThat(springReleaseEvent.getVersion()).isEqualTo("1.0");
}
private SmartCompositeMessageConverter configure(Class<?>... configClass) {
ApplicationContext context = new SpringApplicationBuilder(configClass).run(
"--logging.level.org.springframework.cloud.function=DEBUG", "--spring.main.lazy-initialization=true");
......
......@@ -21,7 +21,7 @@ import java.util.function.Function;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.function.cloudevent.CloudEventAttributes;
import org.springframework.cloud.function.cloudevent.CloudEventAttributesHelper;
import org.springframework.cloud.function.cloudevent.CloudEventAttributesProvider;
import org.springframework.cloud.function.cloudevent.CloudEventMessageUtils;
import org.springframework.context.annotation.Bean;
......@@ -90,7 +90,7 @@ public class CloudeventDemoApplication {
data.setVersion("2.0");
data.setReleaseDateAsString("01-10-2006");
CloudEventAttributes ceAttributes = CloudEventMessageUtils.get(ceMessage.getHeaders())
CloudEventAttributesHelper ceAttributes = CloudEventMessageUtils.get(ceMessage.getHeaders())
.setSource("https://interface21.com/")
.setType("com.interface21");
......
......@@ -22,7 +22,6 @@ import java.net.URI;
import java.text.SimpleDateFormat;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.UUID;
import org.junit.jupiter.api.BeforeEach;
......@@ -30,10 +29,7 @@ import org.junit.jupiter.api.Test;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.test.web.client.TestRestTemplate;
import org.springframework.cloud.function.cloudevent.CloudEventAttributesProvider;
import org.springframework.cloud.function.cloudevent.CloudEventJsonMessageConverter;
import org.springframework.cloud.function.cloudevent.CloudEventMessageUtils;
import org.springframework.cloud.function.cloudevent.DefaultCloudEventAttributesProvider;
import org.springframework.cloud.function.json.JsonMapper;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
......@@ -49,7 +45,6 @@ import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.converter.AbstractMessageConverter;
import org.springframework.messaging.converter.MessageConverter;
import org.springframework.util.MimeType;
import org.springframework.util.MimeTypeUtils;
import org.springframework.util.SocketUtils;
/**
......@@ -219,9 +214,9 @@ public class CloudeventDemoApplicationRESTTests {
ResponseEntity<String> response = testRestTemplate.exchange(re, String.class);
assertThat(response.getBody()).isEqualTo("{\"releaseDate\":\"01-10-2050\",\"releaseName\":\"Spring Framework\",\"version\":\"10.0\"}");
assertThat(response.getHeaders().get(CloudEventMessageUtils.CE_SOURCE))
assertThat(response.getHeaders().get(CloudEventMessageUtils.HTTP_ATTR_PREFIX + CloudEventMessageUtils.SOURCE))
.isEqualTo(Collections.singletonList("http://spring.io/application-application"));
assertThat(response.getHeaders().get(CloudEventMessageUtils.CE_TYPE))
assertThat(response.getHeaders().get(CloudEventMessageUtils.HTTP_ATTR_PREFIX + CloudEventMessageUtils.TYPE))
.isEqualTo(Collections.singletonList(LinkedHashMap.class.getName()));
}
......@@ -236,12 +231,40 @@ public class CloudeventDemoApplicationRESTTests {
ResponseEntity<String> response = testRestTemplate.exchange(re, String.class);
assertThat(response.getBody()).isEqualTo("{\"releaseDate\":\"01-10-2006\",\"releaseName\":\"Spring Framework\",\"version\":\"2.0\"}");
assertThat(response.getHeaders().get(CloudEventMessageUtils.CE_SOURCE))
assertThat(response.getHeaders().get(CloudEventMessageUtils.HTTP_ATTR_PREFIX + CloudEventMessageUtils.SOURCE))
.isEqualTo(Collections.singletonList("http://spring.io/application-application"));
assertThat(response.getHeaders().get(CloudEventMessageUtils.CE_TYPE))
assertThat(response.getHeaders().get(CloudEventMessageUtils.HTTP_ATTR_PREFIX + CloudEventMessageUtils.TYPE))
.isEqualTo(Collections.singletonList(SpringReleaseEvent.class.getName()));
}
/*
* Typically this would never happen since spec mandates that HTTP uses 'ce-` prefix.
* So this is to primarily validate that we can recognize it process it and still produce correct headers
*/
@Test
public void testAsBinaryPojoToPojoWrongHeaders() throws Exception {
SpringApplication.run(new Class[] {CloudeventDemoApplication.class}, new String[] {});
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.APPLICATION_JSON);
headers.set(CloudEventMessageUtils.ATTR_PREFIX + CloudEventMessageUtils.ID, UUID.randomUUID().toString());
headers.set(CloudEventMessageUtils.ATTR_PREFIX + CloudEventMessageUtils.SOURCE, "https://spring.io/");
headers.set(CloudEventMessageUtils.ATTR_PREFIX + CloudEventMessageUtils.SPECVERSION, "1.0");
headers.set(CloudEventMessageUtils.ATTR_PREFIX + CloudEventMessageUtils.TYPE, "org.springframework");
String payload = "{\"releaseDate\":\"01-10-2006\", \"releaseName\":\"Spring Framework\", \"version\":\"1.0\"}";
RequestEntity<String> re = new RequestEntity<>(payload, headers, HttpMethod.POST, this.constructURI("/consumeAndProduceCloudEventAsPojoToPojo"));
ResponseEntity<String> response = testRestTemplate.exchange(re, String.class);
assertThat(response.getBody()).isEqualTo("{\"releaseDate\":\"01-10-2006\",\"releaseName\":\"Spring Framework\",\"version\":\"2.0\"}");
assertThat(response.getHeaders().get(CloudEventMessageUtils.HTTP_ATTR_PREFIX + CloudEventMessageUtils.SOURCE))
.isEqualTo(Collections.singletonList("http://spring.io/application-application"));
assertThat(response.getHeaders().get(CloudEventMessageUtils.HTTP_ATTR_PREFIX + CloudEventMessageUtils.TYPE))
.isEqualTo(Collections.singletonList(SpringReleaseEvent.class.getName()));
}
@Test
public void testAsStructuralPojoToPojo() throws Exception {
ApplicationContext context = SpringApplication.run(CloudeventDemoApplication.class);
......@@ -281,9 +304,9 @@ public class CloudeventDemoApplicationRESTTests {
assertThat(springReleaseEvent.getVersion()).isEqualTo("10.0");
// assertThat(response.getHeaders().get(CloudEventMessageUtils.CE_SOURCE))
// assertThat(response.getHeaders().get(CloudEventMessageUtils.CANONICAL_SOURCE))
// .isEqualTo(Collections.singletonList("http://spring.io/application-application"));
// assertThat(response.getHeaders().get(CloudEventMessageUtils.CE_TYPE))
// assertThat(response.getHeaders().get(CloudEventMessageUtils.CANONICAL_TYPE))
// .isEqualTo(Collections.singletonList(SpringReleaseEvent.class.getName()));
}
......@@ -294,10 +317,10 @@ public class CloudeventDemoApplicationRESTTests {
private HttpHeaders buildHeaders(MediaType contentType) {
HttpHeaders headers = new HttpHeaders();
headers.setContentType(contentType);
headers.set(CloudEventMessageUtils.CE_ID, UUID.randomUUID().toString());
headers.set(CloudEventMessageUtils.CE_SOURCE, "https://spring.io/");
headers.set(CloudEventMessageUtils.CE_SPECVERSION, "1.0");
headers.set(CloudEventMessageUtils.CE_TYPE, "org.springframework");
headers.set(CloudEventMessageUtils.HTTP_ATTR_PREFIX + CloudEventMessageUtils.ID, UUID.randomUUID().toString());
headers.set(CloudEventMessageUtils.HTTP_ATTR_PREFIX + CloudEventMessageUtils.SOURCE, "https://spring.io/");
headers.set(CloudEventMessageUtils.HTTP_ATTR_PREFIX + CloudEventMessageUtils.SPECVERSION, "1.0");
headers.set(CloudEventMessageUtils.HTTP_ATTR_PREFIX + CloudEventMessageUtils.TYPE, "org.springframework");
return headers;
}
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment