001package ca.uhn.fhir.jpa.subscription.module.cache; 002 003/*- 004 * #%L 005 * HAPI FHIR Subscription Server 006 * %% 007 * Copyright (C) 2014 - 2020 University Health Network 008 * %% 009 * Licensed under the Apache License, Version 2.0 (the "License"); 010 * you may not use this file except in compliance with the License. 011 * You may obtain a copy of the License at 012 * 013 * http://www.apache.org/licenses/LICENSE-2.0 014 * 015 * Unless required by applicable law or agreed to in writing, software 016 * distributed under the License is distributed on an "AS IS" BASIS, 017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 018 * See the License for the specific language governing permissions and 019 * limitations under the License. 020 * #L% 021 */ 022 023import ca.uhn.fhir.jpa.api.IDaoRegistry; 024import ca.uhn.fhir.jpa.model.sched.HapiJob; 025import ca.uhn.fhir.jpa.model.sched.ISchedulerService; 026import ca.uhn.fhir.jpa.model.sched.ScheduledJobDefinition; 027import ca.uhn.fhir.jpa.searchparam.SearchParameterMap; 028import ca.uhn.fhir.jpa.searchparam.retry.Retrier; 029import ca.uhn.fhir.rest.api.server.IBundleProvider; 030import ca.uhn.fhir.rest.param.TokenOrListParam; 031import ca.uhn.fhir.rest.param.TokenParam; 032import com.google.common.annotations.VisibleForTesting; 033import org.apache.commons.lang3.time.DateUtils; 034import org.hl7.fhir.instance.model.api.IBaseResource; 035import org.hl7.fhir.r4.model.Subscription; 036import org.quartz.JobExecutionContext; 037import org.slf4j.Logger; 038import org.slf4j.LoggerFactory; 039import org.springframework.beans.factory.annotation.Autowired; 040import org.springframework.context.annotation.Lazy; 041import org.springframework.stereotype.Service; 042 043import javax.annotation.PostConstruct; 044import java.util.HashSet; 045import java.util.List; 046import java.util.Set; 047import java.util.concurrent.Semaphore; 048 049 050@Service 051@Lazy 052public class SubscriptionLoader { 053 private static final Logger ourLog = LoggerFactory.getLogger(SubscriptionLoader.class); 054 private static final int MAX_RETRIES = 60; // 60 * 5 seconds = 5 minutes 055 private final Object mySyncSubscriptionsLock = new Object(); 056 @Autowired 057 private ISubscriptionProvider mySubscriptionProvider; 058 @Autowired 059 private SubscriptionRegistry mySubscriptionRegistry; 060 @Autowired(required = false) 061 private IDaoRegistry myDaoRegistry; 062 private Semaphore mySyncSubscriptionsSemaphore = new Semaphore(1); 063 @Autowired 064 private ISchedulerService mySchedulerService; 065 066 /** 067 * Read the existing subscriptions from the database 068 */ 069 public void syncSubscriptions() { 070 if (myDaoRegistry != null && !myDaoRegistry.isResourceTypeSupported("Subscription")) { 071 return; 072 } 073 if (!mySyncSubscriptionsSemaphore.tryAcquire()) { 074 return; 075 } 076 try { 077 doSyncSubscriptionsWithRetry(); 078 } finally { 079 mySyncSubscriptionsSemaphore.release(); 080 } 081 } 082 083 @VisibleForTesting 084 void acquireSemaphoreForUnitTest() throws InterruptedException { 085 mySyncSubscriptionsSemaphore.acquire(); 086 } 087 088 089 @PostConstruct 090 public void scheduleJob() { 091 ScheduledJobDefinition jobDetail = new ScheduledJobDefinition(); 092 jobDetail.setId(getClass().getName()); 093 jobDetail.setJobClass(Job.class); 094 mySchedulerService.scheduleLocalJob(DateUtils.MILLIS_PER_MINUTE, jobDetail); 095 } 096 097 public static class Job implements HapiJob { 098 @Autowired 099 private SubscriptionLoader myTarget; 100 101 @Override 102 public void execute(JobExecutionContext theContext) { 103 myTarget.syncSubscriptions(); 104 } 105 } 106 107 @VisibleForTesting 108 public int doSyncSubscriptionsForUnitTest() { 109 // Two passes for delete flag to take effect 110 int first = doSyncSubscriptionsWithRetry(); 111 int second = doSyncSubscriptionsWithRetry(); 112 return first + second; 113 } 114 115 synchronized int doSyncSubscriptionsWithRetry() { 116 Retrier<Integer> syncSubscriptionRetrier = new Retrier<>(this::doSyncSubscriptions, MAX_RETRIES); 117 return syncSubscriptionRetrier.runWithRetry(); 118 } 119 120 private int doSyncSubscriptions() { 121 if (mySchedulerService.isStopping()) { 122 return 0; 123 } 124 125 synchronized (mySyncSubscriptionsLock) { 126 ourLog.debug("Starting sync subscriptions"); 127 SearchParameterMap map = new SearchParameterMap(); 128 map.add(Subscription.SP_STATUS, new TokenOrListParam() 129 // TODO KHS Ideally we should only be pulling ACTIVE subscriptions here, but this class is overloaded so that 130 // the @Scheduled task also activates requested subscriptions if their type was enabled after they were requested 131 // There should be a separate @Scheduled task that looks for requested subscriptions that need to be activated 132 // independent of the registry loading process. 133 .addOr(new TokenParam(null, Subscription.SubscriptionStatus.REQUESTED.toCode())) 134 .addOr(new TokenParam(null, Subscription.SubscriptionStatus.ACTIVE.toCode()))); 135 map.setLoadSynchronousUpTo(SubscriptionConstants.MAX_SUBSCRIPTION_RESULTS); 136 137 IBundleProvider subscriptionBundleList = mySubscriptionProvider.search(map); 138 139 Integer subscriptionCount = subscriptionBundleList.size(); 140 assert subscriptionCount != null; 141 if (subscriptionCount >= SubscriptionConstants.MAX_SUBSCRIPTION_RESULTS) { 142 ourLog.error("Currently over " + SubscriptionConstants.MAX_SUBSCRIPTION_RESULTS + " subscriptions. Some subscriptions have not been loaded."); 143 } 144 145 List<IBaseResource> resourceList = subscriptionBundleList.getResources(0, subscriptionCount); 146 147 Set<String> allIds = new HashSet<>(); 148 int changesCount = 0; 149 for (IBaseResource resource : resourceList) { 150 String nextId = resource.getIdElement().getIdPart(); 151 allIds.add(nextId); 152 boolean changed = mySubscriptionProvider.loadSubscription(resource); 153 if (changed) { 154 changesCount++; 155 } 156 } 157 158 mySubscriptionRegistry.unregisterAllSubscriptionsNotInCollection(allIds); 159 ourLog.debug("Finished sync subscriptions - found {}", resourceList.size()); 160 161 return changesCount; 162 } 163 } 164 165 @VisibleForTesting 166 public void setSubscriptionProviderForUnitTest(ISubscriptionProvider theSubscriptionProvider) { 167 mySubscriptionProvider = theSubscriptionProvider; 168 } 169} 170