Implementing Continuous Querying
Use continuous querying in your clients to receive continuous updates to queries run on the servers.
Before you begin, you should be familiar with Querying and have your client/server system configured.
Configure the client pools you will use for CQs with
subscription-enabled
set to true.To have CQ and interest subscription events arrive as closely together as possible, use a single pool for everything. Different pools might use different servers, which can lead to greater differences in event delivery time.
Write your OQL query to retrieve the data you need from the server.
The query must satisfy these CQ requirements in addition to the standard GemFire querying specifications:
- The FROM clause must contain only a single region specification, with optional iterator variable.
- The query must be a SELECT expression only, preceded by zero or more IMPORT statements. This means the query cannot be a statement such as
"/tradeOrder.name"
or"(SELECT * from /tradeOrder).size".
- The CQ query cannot use:
- Cross region joins
- Drill-downs into nested collections
- DISTINCT
- Projections
- Bind parameters
- ORDER BY
- GROUP BY
- Aggregate Functions (MIN, MAX, AVG, SUM, COUNT)
- The CQ query must be created on a partitioned or replicated region. See Region Type Restrictions for CQs.
The basic syntax for the CQ query is:
SELECT * FROM /fullRegionPath [iterator] [WHERE clause]
This example query could be used to get all trade orders where the price is over $100:
SELECT * FROM /tradeOrder t WHERE t.price > 100.00
Write your CQ listeners to handle CQ events from the server. Implement
org.apache.geode.cache.query.CqListener
in each event handler you need. In addition to your main CQ listeners, you might have listeners that you use for all CQs to track statistics or other general information.Note: Be especially careful if you choose to update your cache from your
CqListener
. If your listener updates the region that is queried in its own CQ and that region has aPool
named, the update will be forwarded to the server. If the update on the server satisfies the same CQ, it may be returned to the same listener that did the update, which could put your application into an infinite loop. This same scenario could be played out with multiple regions and multiple CQs, if the listeners are programmed to update each other’s regions.This example outlines a
CqListener
that might be used to update a display screen with current data from the server. The listener gets thequeryOperation
and entry key and value from theCqEvent
and then updates the screen according to the type ofqueryOperation
.// CqListener class public class TradeEventListener implements CqListener { public void onEvent(CqEvent cqEvent) { // org.apache.geode.cache Operation associated with the query op Operation queryOperation = cqEvent.getQueryOperation(); // key and new value from the event Object key = cqEvent.getKey(); TradeOrder tradeOrder = (TradeOrder)cqEvent.getNewValue(); if (queryOperation.isUpdate()) { // update data on the screen for the trade order . . . } else if (queryOperation.isCreate()) { // add the trade order to the screen . . . } else if (queryOperation.isDestroy()) { // remove the trade order from the screen . . . } } public void onError(CqEvent cqEvent) { // handle the error } // From CacheCallback public void close() { // close the output screen for the trades . . . } }
When you install the listener and run the query, your listener will handle all of the CQ results.
If you need your CQs to detect whether they are connected to any of the servers that host its subscription queues, implement a
CqStatusListener
instead of aCqListener
.CqStatusListener
extends the currentCqListener
, allowing a client to detect when a CQ is connected and/or disconnected from the server(s). TheonCqConnected()
method will be invoked when the CQ is connected, and when the CQ has been reconnected after being disconnected. TheonCqDisconnected()
method will be invoked when the CQ is no longer connected to any servers.Taking the example from step 3, we can instead implement a
CqStatusListener
:public class TradeEventListener implements CqStatusListener { public void onEvent(CqEvent cqEvent) { // org.apache.geode.cache Operation associated with the query op Operation queryOperation = cqEvent.getQueryOperation(); // key and new value from the event Object key = cqEvent.getKey(); TradeOrder tradeOrder = (TradeOrder)cqEvent.getNewValue(); if (queryOperation.isUpdate()) { // update data on the screen for the trade order . . . } else if (queryOperation.isCreate()) { // add the trade order to the screen . . . } else if (queryOperation.isDestroy()) { // remove the trade order from the screen . . . } } public void onError(CqEvent cqEvent) { // handle the error } // From CacheCallback public void close() { // close the output screen for the trades . . . } public void onCqConnected() { //Display connected symbol } public void onCqDisconnected() { //Display disconnected symbol } }
When you install the
CqStatusListener
, your listener will be able to detect its connection status to the servers that it is querying.Program your client to run the CQ:
- Create a
CqAttributesFactory
and use it to set yourCqListener
s andCqStatusListener
. - Pass the attributes factory and the CQ query and its unique name to the
QueryService
to create a newCqQuery
. - Start the query running by calling one of the execute methods on the
CqQuery
object. You can execute with or without an initial result set. - When you are done with the CQ, close it.
- Create a
Continuous Query Implementation
// Get cache and queryService - refs to local cache and QueryService
// Create client /tradeOrder region configured to talk to the server
// Create CqAttribute using CqAttributeFactory
CqAttributesFactory cqf = new CqAttributesFactory();
// Create a listener and add it to the CQ attributes callback defined below
CqListener tradeEventListener = new TradeEventListener();
cqf.addCqListener(tradeEventListener);
CqAttributes cqa = cqf.create();
// Name of the CQ and its query
String cqName = "priceTracker";
String queryStr = "SELECT * FROM /tradeOrder t where t.price > 100.00";
// Create the CqQuery
CqQuery priceTracker = queryService.newCq(cqName, queryStr, cqa);
try
{ // Execute CQ, getting the optional initial result set
// Without the initial result set, the call is priceTracker.execute();
SelectResults sResults = priceTracker.executeWithInitialResults();
for (Object o : sResults) {
Struct s = (Struct) o;
TradeOrder to = (TradeOrder) s.get("value");
System.out.println("Intial result includes: " + to);
}
}
catch (Exception ex)
{
ex.printStackTrace();
}
// Now the CQ is running on the server, sending CqEvents to the listener
. . .
// End of life for the CQ - clear up resources by closing
priceTracker.close();
With continuous queries, you can optionally implement:
- Highly available CQs by configuring your servers for high availability.
- Durable CQs by configuring your clients for durable messaging and indicating which CQs are durable at creation.