001package ca.uhn.fhir.jpa.subscription.module.cache;
002
003/*-
004 * #%L
005 * HAPI FHIR Subscription Server
006 * %%
007 * Copyright (C) 2014 - 2020 University Health Network
008 * %%
009 * Licensed under the Apache License, Version 2.0 (the "License");
010 * you may not use this file except in compliance with the License.
011 * You may obtain a copy of the License at
012 *
013 *      http://www.apache.org/licenses/LICENSE-2.0
014 *
015 * Unless required by applicable law or agreed to in writing, software
016 * distributed under the License is distributed on an "AS IS" BASIS,
017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
018 * See the License for the specific language governing permissions and
019 * limitations under the License.
020 * #L%
021 */
022
023import ca.uhn.fhir.jpa.api.IDaoRegistry;
024import ca.uhn.fhir.jpa.model.sched.HapiJob;
025import ca.uhn.fhir.jpa.model.sched.ISchedulerService;
026import ca.uhn.fhir.jpa.model.sched.ScheduledJobDefinition;
027import ca.uhn.fhir.jpa.searchparam.SearchParameterMap;
028import ca.uhn.fhir.jpa.searchparam.retry.Retrier;
029import ca.uhn.fhir.rest.api.server.IBundleProvider;
030import ca.uhn.fhir.rest.param.TokenOrListParam;
031import ca.uhn.fhir.rest.param.TokenParam;
032import com.google.common.annotations.VisibleForTesting;
033import org.apache.commons.lang3.time.DateUtils;
034import org.hl7.fhir.instance.model.api.IBaseResource;
035import org.hl7.fhir.r4.model.Subscription;
036import org.quartz.JobExecutionContext;
037import org.slf4j.Logger;
038import org.slf4j.LoggerFactory;
039import org.springframework.beans.factory.annotation.Autowired;
040import org.springframework.context.annotation.Lazy;
041import org.springframework.stereotype.Service;
042
043import javax.annotation.PostConstruct;
044import java.util.HashSet;
045import java.util.List;
046import java.util.Set;
047import java.util.concurrent.Semaphore;
048
049
050@Service
051@Lazy
052public class SubscriptionLoader {
053        private static final Logger ourLog = LoggerFactory.getLogger(SubscriptionLoader.class);
054        private static final int MAX_RETRIES = 60; // 60 * 5 seconds = 5 minutes
055        private final Object mySyncSubscriptionsLock = new Object();
056        @Autowired
057        private ISubscriptionProvider mySubscriptionProvider;
058        @Autowired
059        private SubscriptionRegistry mySubscriptionRegistry;
060        @Autowired(required = false)
061        private IDaoRegistry myDaoRegistry;
062        private Semaphore mySyncSubscriptionsSemaphore = new Semaphore(1);
063        @Autowired
064        private ISchedulerService mySchedulerService;
065
066        /**
067         * Read the existing subscriptions from the database
068         */
069        public void syncSubscriptions() {
070                if (myDaoRegistry != null && !myDaoRegistry.isResourceTypeSupported("Subscription")) {
071                        return;
072                }
073                if (!mySyncSubscriptionsSemaphore.tryAcquire()) {
074                        return;
075                }
076                try {
077                        doSyncSubscriptionsWithRetry();
078                } finally {
079                        mySyncSubscriptionsSemaphore.release();
080                }
081        }
082
083        @VisibleForTesting
084        void acquireSemaphoreForUnitTest() throws InterruptedException {
085                mySyncSubscriptionsSemaphore.acquire();
086        }
087
088
089        @PostConstruct
090        public void scheduleJob() {
091                ScheduledJobDefinition jobDetail = new ScheduledJobDefinition();
092                jobDetail.setId(getClass().getName());
093                jobDetail.setJobClass(Job.class);
094                mySchedulerService.scheduleLocalJob(DateUtils.MILLIS_PER_MINUTE, jobDetail);
095        }
096
097        public static class Job implements HapiJob {
098                @Autowired
099                private SubscriptionLoader myTarget;
100
101                @Override
102                public void execute(JobExecutionContext theContext) {
103                        myTarget.syncSubscriptions();
104                }
105        }
106
107        @VisibleForTesting
108        public int doSyncSubscriptionsForUnitTest() {
109                // Two passes for delete flag to take effect
110                int first = doSyncSubscriptionsWithRetry();
111                int second = doSyncSubscriptionsWithRetry();
112                return first + second;
113        }
114
115        synchronized int doSyncSubscriptionsWithRetry() {
116                Retrier<Integer> syncSubscriptionRetrier = new Retrier<>(this::doSyncSubscriptions, MAX_RETRIES);
117                return syncSubscriptionRetrier.runWithRetry();
118        }
119
120        private int doSyncSubscriptions() {
121                if (mySchedulerService.isStopping()) {
122                        return 0;
123                }
124
125                synchronized (mySyncSubscriptionsLock) {
126                        ourLog.debug("Starting sync subscriptions");
127                        SearchParameterMap map = new SearchParameterMap();
128                        map.add(Subscription.SP_STATUS, new TokenOrListParam()
129                                // TODO KHS Ideally we should only be pulling ACTIVE subscriptions here, but this class is overloaded so that
130                                // the @Scheduled task also activates requested subscriptions if their type was enabled after they were requested
131                                // There should be a separate @Scheduled task that looks for requested subscriptions that need to be activated
132                                // independent of the registry loading process.
133                                .addOr(new TokenParam(null, Subscription.SubscriptionStatus.REQUESTED.toCode()))
134                                .addOr(new TokenParam(null, Subscription.SubscriptionStatus.ACTIVE.toCode())));
135                        map.setLoadSynchronousUpTo(SubscriptionConstants.MAX_SUBSCRIPTION_RESULTS);
136
137                        IBundleProvider subscriptionBundleList = mySubscriptionProvider.search(map);
138
139                        Integer subscriptionCount = subscriptionBundleList.size();
140                        assert subscriptionCount != null;
141                        if (subscriptionCount >= SubscriptionConstants.MAX_SUBSCRIPTION_RESULTS) {
142                                ourLog.error("Currently over " + SubscriptionConstants.MAX_SUBSCRIPTION_RESULTS + " subscriptions.  Some subscriptions have not been loaded.");
143                        }
144
145                        List<IBaseResource> resourceList = subscriptionBundleList.getResources(0, subscriptionCount);
146
147                        Set<String> allIds = new HashSet<>();
148                        int changesCount = 0;
149                        for (IBaseResource resource : resourceList) {
150                                String nextId = resource.getIdElement().getIdPart();
151                                allIds.add(nextId);
152                                boolean changed = mySubscriptionProvider.loadSubscription(resource);
153                                if (changed) {
154                                        changesCount++;
155                                }
156                        }
157
158                        mySubscriptionRegistry.unregisterAllSubscriptionsNotInCollection(allIds);
159                        ourLog.debug("Finished sync subscriptions - found {}", resourceList.size());
160
161                        return changesCount;
162                }
163        }
164
165        @VisibleForTesting
166        public void setSubscriptionProviderForUnitTest(ISubscriptionProvider theSubscriptionProvider) {
167                mySubscriptionProvider = theSubscriptionProvider;
168        }
169}
170