001package ca.uhn.fhir.jpa.subscription.module.subscriber;
002
003/*-
004 * #%L
005 * HAPI FHIR Subscription Server
006 * %%
007 * Copyright (C) 2014 - 2020 University Health Network
008 * %%
009 * Licensed under the Apache License, Version 2.0 (the "License");
010 * you may not use this file except in compliance with the License.
011 * You may obtain a copy of the License at
012 *
013 *      http://www.apache.org/licenses/LICENSE-2.0
014 *
015 * Unless required by applicable law or agreed to in writing, software
016 * distributed under the License is distributed on an "AS IS" BASIS,
017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
018 * See the License for the specific language governing permissions and
019 * limitations under the License.
020 * #L%
021 */
022
023import ca.uhn.fhir.interceptor.api.HookParams;
024import ca.uhn.fhir.interceptor.api.IInterceptorBroadcaster;
025import ca.uhn.fhir.jpa.subscription.module.CanonicalSubscription;
026import ca.uhn.fhir.interceptor.api.Pointcut;
027import ca.uhn.fhir.rest.api.EncodingEnum;
028import ca.uhn.fhir.rest.api.RequestTypeEnum;
029import ca.uhn.fhir.rest.client.api.*;
030import ca.uhn.fhir.rest.client.interceptor.SimpleRequestHeaderInterceptor;
031import ca.uhn.fhir.rest.gclient.IClientExecutable;
032import ca.uhn.fhir.rest.server.exceptions.ResourceGoneException;
033import ca.uhn.fhir.rest.server.exceptions.ResourceNotFoundException;
034import org.apache.commons.lang3.StringUtils;
035import org.hl7.fhir.instance.model.api.IBaseResource;
036import org.hl7.fhir.instance.model.api.IIdType;
037import org.slf4j.Logger;
038import org.slf4j.LoggerFactory;
039import org.springframework.beans.factory.annotation.Autowired;
040import org.springframework.context.annotation.Scope;
041import org.springframework.messaging.MessagingException;
042import org.springframework.stereotype.Component;
043
044import java.io.IOException;
045import java.util.*;
046
047import static org.apache.commons.lang3.StringUtils.isNotBlank;
048
049@Component
050@Scope("prototype")
051public class SubscriptionDeliveringRestHookSubscriber extends BaseSubscriptionDeliverySubscriber {
052        @Autowired
053        IResourceRetriever myResourceRetriever;
054        private Logger ourLog = LoggerFactory.getLogger(SubscriptionDeliveringRestHookSubscriber.class);
055
056        protected void deliverPayload(ResourceDeliveryMessage theMsg, CanonicalSubscription theSubscription, EncodingEnum thePayloadType, IGenericClient theClient) {
057                IBaseResource payloadResource = getAndMassagePayload(theMsg, theSubscription);
058
059                // Regardless of whether we have a payload, the rest-hook should be sent.
060                doDelivery(theMsg, theSubscription, thePayloadType, theClient, payloadResource);
061        }
062
063        protected void doDelivery(ResourceDeliveryMessage theMsg, CanonicalSubscription theSubscription, EncodingEnum thePayloadType, IGenericClient theClient, IBaseResource thePayloadResource) {
064                IClientExecutable<?, ?> operation;
065                switch (theMsg.getOperationType()) {
066                        case CREATE:
067                        case UPDATE:
068                                if (thePayloadResource == null || thePayloadResource.isEmpty()) {
069                                        if (thePayloadType != null) {
070                                                operation = theClient.create().resource(thePayloadResource);
071                                        } else {
072                                                sendNotification(theMsg);
073                                                return;
074                                        }
075                                } else {
076                                        if (thePayloadType != null) {
077                                                operation = theClient.update().resource(thePayloadResource);
078                                        } else {
079                                                sendNotification(theMsg);
080                                                return;
081                                        }
082                                }
083                                break;
084                        case DELETE:
085                                operation = theClient.delete().resourceById(theMsg.getPayloadId(myFhirContext));
086                                break;
087                        default:
088                                ourLog.warn("Ignoring delivery message of type: {}", theMsg.getOperationType());
089                                return;
090                }
091
092                if (thePayloadType != null) {
093                        operation.encoded(thePayloadType);
094                }
095
096                String payloadId = null;
097                if (thePayloadResource != null) {
098                        payloadId = thePayloadResource.getIdElement().toUnqualified().getValue();
099                }
100                ourLog.info("Delivering {} rest-hook payload {} for {}", theMsg.getOperationType(), payloadId, theSubscription.getIdElement(myFhirContext).toUnqualifiedVersionless().getValue());
101
102                try {
103                        operation.execute();
104                } catch (ResourceNotFoundException e) {
105                        ourLog.error("Cannot reach {} ", theMsg.getSubscription().getEndpointUrl());
106                        ourLog.error("Exception: ", e);
107                        throw e;
108                }
109        }
110
111        protected IBaseResource getAndMassagePayload(ResourceDeliveryMessage theMsg, CanonicalSubscription theSubscription) {
112                IBaseResource payloadResource = theMsg.getPayload(myFhirContext);
113
114                if (payloadResource == null || theSubscription.getRestHookDetails().isDeliverLatestVersion()) {
115                        IIdType payloadId = theMsg.getPayloadId(myFhirContext);
116
117                        try {
118                                if (payloadId != null) {
119                                        payloadResource = myResourceRetriever.getResource(payloadId.toVersionless());
120                                } else {
121                                        return null;
122                                }
123                        } catch (ResourceGoneException e) {
124                                ourLog.warn("Resource {} is deleted, not going to deliver for subscription {}", payloadId.toVersionless(), theSubscription.getIdElement(myFhirContext));
125                                return null;
126                        }
127                }
128
129                IIdType resourceId = payloadResource.getIdElement();
130                if (theSubscription.getRestHookDetails().isStripVersionId()) {
131                        resourceId = resourceId.toVersionless();
132                        payloadResource.setId(resourceId);
133                }
134                return payloadResource;
135        }
136
137        @Override
138        public void handleMessage(ResourceDeliveryMessage theMessage) throws MessagingException {
139                CanonicalSubscription subscription = theMessage.getSubscription();
140
141                // Interceptor call: SUBSCRIPTION_BEFORE_REST_HOOK_DELIVERY
142                HookParams params = new HookParams()
143                        .add(CanonicalSubscription.class, subscription)
144                        .add(ResourceDeliveryMessage.class, theMessage);
145                if (!getInterceptorBroadcaster().callHooks(Pointcut.SUBSCRIPTION_BEFORE_REST_HOOK_DELIVERY, params)) {
146                        return;
147                }
148
149                // Grab the endpoint from the subscription
150                String endpointUrl = subscription.getEndpointUrl();
151
152                // Grab the payload type (encoding mimetype) from the subscription
153                String payloadString = subscription.getPayloadString();
154                EncodingEnum payloadType = null;
155                if (payloadString != null) {
156                        payloadType = EncodingEnum.forContentType(payloadString);
157                }
158
159                // Create the client request
160                myFhirContext.getRestfulClientFactory().setServerValidationMode(ServerValidationModeEnum.NEVER);
161                IGenericClient client = null;
162                if (isNotBlank(endpointUrl)) {
163                        client = myFhirContext.newRestfulGenericClient(endpointUrl);
164
165                        // Additional headers specified in the subscription
166                        List<String> headers = subscription.getHeaders();
167                        for (String next : headers) {
168                                if (isNotBlank(next)) {
169                                        client.registerInterceptor(new SimpleRequestHeaderInterceptor(next));
170                                }
171                        }
172                }
173
174                deliverPayload(theMessage, subscription, payloadType, client);
175
176                // Interceptor call: SUBSCRIPTION_AFTER_REST_HOOK_DELIVERY
177                params = new HookParams()
178                        .add(CanonicalSubscription.class, subscription)
179                        .add(ResourceDeliveryMessage.class, theMessage);
180                if (!getInterceptorBroadcaster().callHooks(Pointcut.SUBSCRIPTION_AFTER_REST_HOOK_DELIVERY, params)) {
181                        //noinspection UnnecessaryReturnStatement
182                        return;
183                }
184
185        }
186
187        /**
188         * Sends a POST notification without a payload
189         */
190        protected void sendNotification(ResourceDeliveryMessage theMsg) {
191                Map<String, List<String>> params = new HashMap<>();
192                List<Header> headers = new ArrayList<>();
193                if (theMsg.getSubscription().getHeaders() != null) {
194                        theMsg.getSubscription().getHeaders().stream().filter(Objects::nonNull).forEach(h -> {
195                                final int sep = h.indexOf(':');
196                                if (sep > 0) {
197                                        final String name = h.substring(0, sep);
198                                        final String value = h.substring(sep + 1);
199                                        if (StringUtils.isNotBlank(name)) {
200                                                headers.add(new Header(name.trim(), value.trim()));
201                                        }
202                                }
203                        });
204                }
205
206                StringBuilder url = new StringBuilder(theMsg.getSubscription().getEndpointUrl());
207                IHttpClient client = myFhirContext.getRestfulClientFactory().getHttpClient(url, params, "", RequestTypeEnum.POST, headers);
208                IHttpRequest request = client.createParamRequest(myFhirContext, params, null);
209                try {
210                        IHttpResponse response = request.execute();
211                        // close connection in order to return a possible cached connection to the connection pool
212                        response.close();
213                } catch (IOException e) {
214                        ourLog.error("Error trying to reach " + theMsg.getSubscription().getEndpointUrl());
215                        e.printStackTrace();
216                        throw new ResourceNotFoundException(e.getMessage());
217                }
218        }
219}