001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.transport.vm;
018
019import java.io.IOException;
020import java.net.URI;
021import java.net.URISyntaxException;
022import java.util.HashMap;
023import java.util.Map;
024import java.util.concurrent.ConcurrentHashMap;
025
026import org.apache.activemq.broker.BrokerFactory;
027import org.apache.activemq.broker.BrokerFactoryHandler;
028import org.apache.activemq.broker.BrokerRegistry;
029import org.apache.activemq.broker.BrokerService;
030import org.apache.activemq.broker.TransportConnector;
031import org.apache.activemq.transport.MarshallingTransportFilter;
032import org.apache.activemq.transport.Transport;
033import org.apache.activemq.transport.TransportFactory;
034import org.apache.activemq.transport.TransportServer;
035import org.apache.activemq.util.IOExceptionSupport;
036import org.apache.activemq.util.IntrospectionSupport;
037import org.apache.activemq.util.ServiceSupport;
038import org.apache.activemq.util.URISupport;
039import org.apache.activemq.util.URISupport.CompositeData;
040import org.slf4j.Logger;
041import org.slf4j.LoggerFactory;
042import org.slf4j.MDC;
043
044public class VMTransportFactory extends TransportFactory {
045    
046    public static final ConcurrentHashMap<String, BrokerService> BROKERS = new ConcurrentHashMap<String, BrokerService>();
047    public static final ConcurrentHashMap<String, TransportConnector> CONNECTORS = new ConcurrentHashMap<String, TransportConnector>();
048    public static final ConcurrentHashMap<String, VMTransportServer> SERVERS = new ConcurrentHashMap<String, VMTransportServer>();
049    private static final Logger LOG = LoggerFactory.getLogger(VMTransportFactory.class);
050    
051    BrokerFactoryHandler brokerFactoryHandler;
052
053    public Transport doConnect(URI location) throws Exception {
054        return VMTransportServer.configure(doCompositeConnect(location));
055    }
056
057    public Transport doCompositeConnect(URI location) throws Exception {
058        URI brokerURI;
059        String host;
060        Map<String, String> options;
061        boolean create = true;
062        int waitForStart = -1;
063        CompositeData data = URISupport.parseComposite(location);
064        if (data.getComponents().length == 1 && "broker".equals(data.getComponents()[0].getScheme())) {
065            brokerURI = data.getComponents()[0];
066            CompositeData brokerData = URISupport.parseComposite(brokerURI);
067            host = (String)brokerData.getParameters().get("brokerName");
068            if (host == null) {
069                host = "localhost";
070            }
071            if (brokerData.getPath() != null) {
072                host = brokerData.getPath();
073            }
074            options = data.getParameters();
075            location = new URI("vm://" + host);
076        } else {
077            // If using the less complex vm://localhost?broker.persistent=true
078            // form
079            try {
080                host = extractHost(location);
081                options = URISupport.parseParameters(location);
082                String config = (String)options.remove("brokerConfig");
083                if (config != null) {
084                    brokerURI = new URI(config);
085                } else {
086                    Map brokerOptions = IntrospectionSupport.extractProperties(options, "broker.");
087                    brokerURI = new URI("broker://()/" + host + "?"
088                                        + URISupport.createQueryString(brokerOptions));
089                }
090                if ("false".equals(options.remove("create"))) {
091                    create = false;
092                }
093                String waitForStartString = options.remove("waitForStart");
094                if (waitForStartString != null) {
095                    waitForStart = Integer.parseInt(waitForStartString);
096                }
097            } catch (URISyntaxException e1) {
098                throw IOExceptionSupport.create(e1);
099            }
100            location = new URI("vm://" + host);
101        }
102        if (host == null) {
103            host = "localhost";
104        }
105        VMTransportServer server = SERVERS.get(host);
106        // validate the broker is still active
107        if (!validateBroker(host) || server == null) {
108            BrokerService broker = null;
109            // Synchronize on the registry so that multiple concurrent threads
110            // doing this do not think that the broker has not been created and
111            // cause multiple brokers to be started.
112            synchronized (BrokerRegistry.getInstance().getRegistryMutext()) {
113                broker = lookupBroker(BrokerRegistry.getInstance(), host, waitForStart);
114                if (broker == null) {
115                    if (!create) {
116                        throw new IOException("Broker named '" + host + "' does not exist.");
117                    }
118                    try {
119                        if (brokerFactoryHandler != null) {
120                            broker = brokerFactoryHandler.createBroker(brokerURI);
121                        } else {
122                            broker = BrokerFactory.createBroker(brokerURI);
123                        }
124                        broker.start();
125                        MDC.put("activemq.broker", broker.getBrokerName());
126                    } catch (URISyntaxException e) {
127                        throw IOExceptionSupport.create(e);
128                    }
129                    BROKERS.put(host, broker);
130                    BrokerRegistry.getInstance().getRegistryMutext().notifyAll();
131                }
132
133                server = SERVERS.get(host);
134                if (server == null) {
135                    server = (VMTransportServer)bind(location, true);
136                    TransportConnector connector = new TransportConnector(server);
137                    connector.setBrokerService(broker);
138                    connector.setUri(location);
139                    connector.setTaskRunnerFactory(broker.getTaskRunnerFactory());
140                    connector.start();
141                    CONNECTORS.put(host, connector);
142                }
143
144            }
145        }
146
147        VMTransport vmtransport = server.connect();
148        IntrospectionSupport.setProperties(vmtransport.peer, new HashMap<String,String>(options));
149        IntrospectionSupport.setProperties(vmtransport, options);
150        Transport transport = vmtransport;
151        if (vmtransport.isMarshal()) {
152            Map<String, String> optionsCopy = new HashMap<String, String>(options);
153            transport = new MarshallingTransportFilter(transport, createWireFormat(options),
154                                                       createWireFormat(optionsCopy));
155        }
156        if (!options.isEmpty()) {
157            throw new IllegalArgumentException("Invalid connect parameters: " + options);
158        }
159        return transport;
160    }
161
162   private static String extractHost(URI location) {
163       String host = location.getHost();
164       if (host == null || host.length() == 0) {
165           host = location.getAuthority();
166           if (host == null || host.length() == 0) {
167               host = "localhost";
168           }
169       }
170       return host;
171    }
172
173/**
174    * @param registry
175    * @param brokerName
176    * @param waitForStart - time in milliseconds to wait for a broker to appear
177    * @return
178    */
179    private BrokerService lookupBroker(final BrokerRegistry registry, final String brokerName, int waitForStart) {
180        BrokerService broker = null;
181        synchronized(registry.getRegistryMutext()) {
182            broker = registry.lookup(brokerName);
183            if (broker == null && waitForStart > 0) {
184                final long expiry = System.currentTimeMillis() + waitForStart;
185                while (broker == null  && expiry > System.currentTimeMillis()) {
186                    long timeout = Math.max(0, expiry - System.currentTimeMillis());
187                    try {
188                        LOG.debug("waiting for broker named: " + brokerName + " to start");
189                        registry.getRegistryMutext().wait(timeout);
190                    } catch (InterruptedException ignored) {
191                    }
192                    broker = registry.lookup(brokerName);
193                }
194            }
195        }
196        return broker;
197    }
198
199    public TransportServer doBind(URI location) throws IOException {
200        return bind(location, false);
201    }
202
203    /**
204     * @param location
205     * @return the TransportServer
206     * @throws IOException
207     */
208    private TransportServer bind(URI location, boolean dispose) throws IOException {
209        String host = extractHost(location);
210        LOG.debug("binding to broker: " + host);
211        VMTransportServer server = new VMTransportServer(location, dispose);
212        Object currentBoundValue = SERVERS.get(host);
213        if (currentBoundValue != null) {
214            throw new IOException("VMTransportServer already bound at: " + location);
215        }
216        SERVERS.put(host, server);
217        return server;
218    }
219
220    public static void stopped(VMTransportServer server) {
221        String host = extractHost(server.getBindURI());
222        stopped(host);
223    }
224
225    public static void stopped(String host) {
226        SERVERS.remove(host);
227        TransportConnector connector = CONNECTORS.remove(host);
228        if (connector != null) {
229            LOG.debug("Shutting down VM connectors for broker: " + host);
230            ServiceSupport.dispose(connector);
231            BrokerService broker = BROKERS.remove(host);
232            if (broker != null) {
233                ServiceSupport.dispose(broker);
234            }
235            MDC.remove("activemq.broker");
236        }
237    }
238
239    public BrokerFactoryHandler getBrokerFactoryHandler() {
240        return brokerFactoryHandler;
241    }
242
243    public void setBrokerFactoryHandler(BrokerFactoryHandler brokerFactoryHandler) {
244        this.brokerFactoryHandler = brokerFactoryHandler;
245    }
246
247    private boolean validateBroker(String host) {
248        boolean result = true;
249        if (BROKERS.containsKey(host) || SERVERS.containsKey(host) || CONNECTORS.containsKey(host)) {
250            // check the broker is still in the BrokerRegistry
251            TransportConnector connector = CONNECTORS.get(host);
252            if (BrokerRegistry.getInstance().lookup(host) == null
253                || (connector != null && connector.getBroker().isStopped())) {
254                result = false;
255                // clean-up
256                BROKERS.remove(host);
257                SERVERS.remove(host);
258                if (connector != null) {
259                    CONNECTORS.remove(host);
260                    if (connector != null) {
261                        ServiceSupport.dispose(connector);
262                    }
263                }
264            }
265        }
266        return result;
267    }
268}