001package ca.uhn.fhir.jpa.subscription.module.subscriber;
002
003import ca.uhn.fhir.context.FhirContext;
004import ca.uhn.fhir.interceptor.api.HookParams;
005import ca.uhn.fhir.interceptor.api.IInterceptorBroadcaster;
006import ca.uhn.fhir.interceptor.api.Pointcut;
007import ca.uhn.fhir.jpa.searchparam.matcher.InMemoryMatchResult;
008import ca.uhn.fhir.jpa.subscription.module.CanonicalSubscription;
009import ca.uhn.fhir.jpa.subscription.module.ResourceModifiedMessage;
010import ca.uhn.fhir.jpa.subscription.module.cache.ActiveSubscription;
011import ca.uhn.fhir.jpa.subscription.module.cache.SubscriptionRegistry;
012import ca.uhn.fhir.jpa.subscription.module.channel.SubscriptionChannelRegistry;
013import ca.uhn.fhir.jpa.subscription.module.matcher.ISubscriptionMatcher;
014import ca.uhn.fhir.rest.api.Constants;
015import ca.uhn.fhir.rest.api.EncodingEnum;
016import org.apache.commons.lang3.StringUtils;
017import org.hl7.fhir.instance.model.api.IBaseResource;
018import org.hl7.fhir.instance.model.api.IIdType;
019import org.slf4j.Logger;
020import org.slf4j.LoggerFactory;
021import org.springframework.beans.factory.annotation.Autowired;
022import org.springframework.messaging.Message;
023import org.springframework.messaging.MessageChannel;
024import org.springframework.messaging.MessageHandler;
025import org.springframework.messaging.MessagingException;
026import org.springframework.stereotype.Service;
027
028import java.util.Collection;
029
030import static org.apache.commons.lang3.ObjectUtils.defaultIfNull;
031import static org.apache.commons.lang3.StringUtils.isNotBlank;
032
033/*-
034 * #%L
035 * HAPI FHIR Subscription Server
036 * %%
037 * Copyright (C) 2014 - 2020 University Health Network
038 * %%
039 * Licensed under the Apache License, Version 2.0 (the "License");
040 * you may not use this file except in compliance with the License.
041 * You may obtain a copy of the License at
042 *
043 *      http://www.apache.org/licenses/LICENSE-2.0
044 *
045 * Unless required by applicable law or agreed to in writing, software
046 * distributed under the License is distributed on an "AS IS" BASIS,
047 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
048 * See the License for the specific language governing permissions and
049 * limitations under the License.
050 * #L%
051 */
052
053@Service
054public class SubscriptionMatchingSubscriber implements MessageHandler {
055        private Logger ourLog = LoggerFactory.getLogger(SubscriptionMatchingSubscriber.class);
056
057        @Autowired
058        private ISubscriptionMatcher mySubscriptionMatcher;
059        @Autowired
060        private FhirContext myFhirContext;
061        @Autowired
062        private SubscriptionRegistry mySubscriptionRegistry;
063        @Autowired
064        private IInterceptorBroadcaster myInterceptorBroadcaster;
065        @Autowired
066        private SubscriptionChannelRegistry mySubscriptionChannelRegistry;
067
068        @Override
069        public void handleMessage(Message<?> theMessage) throws MessagingException {
070                ourLog.trace("Handling resource modified message: {}", theMessage);
071
072                if (!(theMessage instanceof ResourceModifiedJsonMessage)) {
073                        ourLog.warn("Unexpected message payload type: {}", theMessage);
074                        return;
075                }
076
077                ResourceModifiedMessage msg = ((ResourceModifiedJsonMessage) theMessage).getPayload();
078                matchActiveSubscriptionsAndDeliver(msg);
079
080        }
081
082        public void matchActiveSubscriptionsAndDeliver(ResourceModifiedMessage theMsg) {
083                switch (theMsg.getOperationType()) {
084                        case CREATE:
085                        case UPDATE:
086                        case MANUALLY_TRIGGERED:
087                                break;
088                        case DELETE:
089                        default:
090                                ourLog.trace("Not processing modified message for {}", theMsg.getOperationType());
091                                // ignore anything else
092                                return;
093                }
094
095                // Interceptor call: SUBSCRIPTION_BEFORE_PERSISTED_RESOURCE_CHECKED
096                HookParams params = new HookParams()
097                        .add(ResourceModifiedMessage.class, theMsg);
098                if (!myInterceptorBroadcaster.callHooks(Pointcut.SUBSCRIPTION_BEFORE_PERSISTED_RESOURCE_CHECKED, params)) {
099                        return;
100                }
101
102                try {
103                        doMatchActiveSubscriptionsAndDeliver(theMsg);
104                } finally {
105                        // Interceptor call: SUBSCRIPTION_AFTER_PERSISTED_RESOURCE_CHECKED
106                        myInterceptorBroadcaster.callHooks(Pointcut.SUBSCRIPTION_AFTER_PERSISTED_RESOURCE_CHECKED, params);
107                }
108        }
109
110        private void doMatchActiveSubscriptionsAndDeliver(ResourceModifiedMessage theMsg) {
111                IIdType resourceId = theMsg.getId(myFhirContext);
112                Boolean isText = false;
113
114                Collection<ActiveSubscription> subscriptions = mySubscriptionRegistry.getAll();
115
116                ourLog.trace("Testing {} subscriptions for applicability", subscriptions.size());
117                boolean resourceMatched = false;
118
119                for (ActiveSubscription nextActiveSubscription : subscriptions) {
120
121                        String nextSubscriptionId = getId(nextActiveSubscription);
122
123                        if (isNotBlank(theMsg.getSubscriptionId())) {
124                                if (!theMsg.getSubscriptionId().equals(nextSubscriptionId)) {
125                                        // TODO KHS we should use a hash to look it up instead of this full table scan
126                                        ourLog.debug("Ignoring subscription {} because it is not {}", nextSubscriptionId, theMsg.getSubscriptionId());
127                                        continue;
128                                }
129                        }
130
131                        if (!validCriteria(nextActiveSubscription, resourceId)) {
132                                continue;
133                        }
134
135                        InMemoryMatchResult matchResult = mySubscriptionMatcher.match(nextActiveSubscription.getSubscription(), theMsg);
136                        if (!matchResult.matched()) {
137                                continue;
138                        }
139                        ourLog.debug("Subscription {} was matched by resource {} {}",
140                                nextActiveSubscription.getId(),
141                                resourceId.toUnqualifiedVersionless().getValue(),
142                                matchResult.isInMemory() ? "in-memory" : "by querying the repository");
143
144                        IBaseResource payload = theMsg.getNewPayload(myFhirContext);
145                        CanonicalSubscription subscription = nextActiveSubscription.getSubscription();
146
147                        EncodingEnum encoding = null;
148                        if (subscription.getPayloadString() != null && !subscription.getPayloadString().isEmpty()) {
149                                encoding = EncodingEnum.forContentType(subscription.getPayloadString());
150                                isText = subscription.getPayloadString().equals(Constants.CT_TEXT);
151                        }
152                        encoding = defaultIfNull(encoding, EncodingEnum.JSON);
153
154                        ResourceDeliveryMessage deliveryMsg = new ResourceDeliveryMessage();
155
156                        deliveryMsg.setPayload(myFhirContext, payload, encoding);
157                        deliveryMsg.setSubscription(subscription);
158                        deliveryMsg.setOperationType(theMsg.getOperationType());
159                        deliveryMsg.copyAdditionalPropertiesFrom(theMsg);
160
161                        // Interceptor call: SUBSCRIPTION_RESOURCE_MATCHED
162                        HookParams params = new HookParams()
163                                .add(CanonicalSubscription.class, nextActiveSubscription.getSubscription())
164                                .add(ResourceDeliveryMessage.class, deliveryMsg)
165                                .add(InMemoryMatchResult.class, matchResult);
166                        if (!myInterceptorBroadcaster.callHooks(Pointcut.SUBSCRIPTION_RESOURCE_MATCHED, params)) {
167                                return;
168                        }
169
170                        resourceMatched |= sendToDeliveryChannel(nextActiveSubscription, deliveryMsg);
171                }
172
173                if (!resourceMatched) {
174                        // Interceptor call: SUBSCRIPTION_RESOURCE_MATCHED
175                        HookParams params = new HookParams()
176                                .add(ResourceModifiedMessage.class, theMsg);
177                        myInterceptorBroadcaster.callHooks(Pointcut.SUBSCRIPTION_RESOURCE_DID_NOT_MATCH_ANY_SUBSCRIPTIONS, params);
178                }
179        }
180
181        private boolean sendToDeliveryChannel(ActiveSubscription nextActiveSubscription, ResourceDeliveryMessage theDeliveryMsg) {
182                boolean retval = false;
183                ResourceDeliveryJsonMessage wrappedMsg = new ResourceDeliveryJsonMessage(theDeliveryMsg);
184                MessageChannel deliveryChannel = mySubscriptionChannelRegistry.get(nextActiveSubscription.getChannelName()).getChannel();
185                if (deliveryChannel != null) {
186                        retval = true;
187                        trySendToDeliveryChannel(wrappedMsg, deliveryChannel);
188                } else {
189                        ourLog.warn("Do not have delivery channel for subscription {}", nextActiveSubscription.getId());
190                }
191                return retval;
192        }
193
194        private void trySendToDeliveryChannel(ResourceDeliveryJsonMessage theWrappedMsg, MessageChannel theDeliveryChannel) {
195                try {
196                        boolean success = theDeliveryChannel.send(theWrappedMsg);
197                        if (!success) {
198                                ourLog.warn("Failed to send message to Delivery Channel.");
199                        }
200                } catch (RuntimeException e) {
201                        ourLog.error("Failed to send message to Delivery Channel", e);
202                        throw new RuntimeException("Failed to send message to Delivery Channel", e);
203                }
204        }
205
206        private String getId(ActiveSubscription theActiveSubscription) {
207                return theActiveSubscription.getId();
208        }
209
210        private boolean validCriteria(ActiveSubscription theActiveSubscription, IIdType theResourceId) {
211                String criteriaString = theActiveSubscription.getCriteriaString();
212                String subscriptionId = getId(theActiveSubscription);
213                String resourceType = theResourceId.getResourceType();
214
215                if (StringUtils.isBlank(criteriaString)) {
216                        return false;
217                }
218
219                // see if the criteria matches the created object
220                ourLog.trace("Checking subscription {} for {} with criteria {}", subscriptionId, resourceType, criteriaString);
221                String criteriaResource = criteriaString;
222                int index = criteriaResource.indexOf("?");
223                if (index != -1) {
224                        criteriaResource = criteriaResource.substring(0, criteriaResource.indexOf("?"));
225                }
226
227                if (resourceType != null && !criteriaResource.equals(resourceType)) {
228                        ourLog.trace("Skipping subscription search for {} because it does not match the criteria {}", resourceType, criteriaString);
229                        return false;
230                }
231
232                return true;
233        }
234}