Apache Geode Native C++ CHANGELOG

Function Execution

A client can invoke a server-resident function, with parameters, and can collect and operate on the returned results.

Server-side Requirements

To be callable from your client, a function must be

  • resident on the server, and
  • registered as available for client access.

See Executing a Function in Apache Geode in the Geode User Guide for details on how to write and register server-resident functions.

Client-side Requirements

The client must connect to the server through a connection pool in order to invoke a server-side function.

How Functions Execute

  1. The calling client application runs the execute method on the Execution object. The function must already be registered on the servers.
  2. The function is invoked on the servers where it needs to run. The servers are determined by the FunctionService on* method calls, region configuration, and any filters.
  3. If the function has results, the result is returned in a ResultCollector object.
  4. The client collects results using the ResultCollector.getResult() method.

In every client where you want to execute the function and process the results:

  • Use one of the FunctionService on* methods to create an Execution object. The on* methods, onRegion, onServer and onServers, define the highest level where the function is run.
  • If you use onRegion you can further narrow your run scope by setting key filters.
  • A function run using onRegion is a data dependent function – others are data-independent functions.
  • You can run a data dependent function against partitioned and colocated partitioned regions. From the client, provide the appropriate key sets to the function call.

  • The Execution object allows you to customize the invocation by:

    • Providing a set of data keys to withFilter to narrow the execution scope. This works only for onRegion Execution objects (data-dependent functions).
    • Providing function arguments to withArgs.
    • Defining a custom ResultCollector for withCollector.
  • Call the Execution.execute() method to run the function.

Processing Function Results

To get the results from the function in the client app, use the result collector returned from the function execution. The getResult methods of the default result collector block until all results are received, then return the full result set.

The client can use the default result collector. If the client needs special results handling, code a custom ResultsCollector implementation to replace the default. Use the Execution::withCollector method to specify the custom collector. To handle the results in a custom manner:

  1. Write a class that implements the ResultCollector interface to handle the results in a custom manner. The methods are of two types: one handles data and information from Geode and populates the results set, while the other returns the compiled results to the calling application:
    • addResult is called when results arrive from the Function methods. Use addResult to add a single result to the ResultCollector.
    • endResults is called to signal the end of all results from the function execution.
    • getResult is available to your executing application (the one that calls Execution.execute) to retrieve the results. This may block until all results are available.
    • clearResults is called to clear partial results from the results collector. This is used only for highly available onRegion functions where the calling application waits for the results. If the call fails, before Geode retries the execution, it calls clearResults to ready the instance for a clean set of results.
  2. Use the Execution object in your executing member to call withCollector, passing your custom collector.

Function Execution Example

The native client release contains examples of function execution in ../examples/cpp/functionexecution.

  • The example begins with a server-side script that runs gfsh commands to create a region, simply called “partition_region”.
  • The function is preloaded with a JAR file containing the server-side Java function code.
  • The function, called “ExampleMultiGetFunction”, is defined in the examples/utilities directory of your distribution. As its input parameter, the function takes an array of keys, then performs a get on each key and returns an array containing the results.
  • The function does not load values into the data store. That is a separate operation, performed in these examples by the client, and does not involve the server-side function.

As prerequisites, the client code must be aware of the connection to the server, the name of the function, and the expected type/format of the input parameter and return value.

The client:

  • creates an execution object
  • provides the execution object with a populated input parameter array
  • invokes the object’s execute method to invoke the server-side function

If the client expects results, it must create a result object. The .NET Framework example uses a built-in result collector (IResultCollector.getResults()) to retrieve the function results.

The example creates a result variable to hold the results from the collector.

C++ Example

This section contains code snippets showing highlights of the C++ function execution example. They are not intended for cut-and-paste execution. For the complete source, see the example source directory.

The C++ example creates a cache.

Cache setupCache() {
  return CacheFactory()
      .set("log-level", "none")
      .create();
}

The example client uses the cache to create a connection pool,

void createPool(const Cache& cache) {
  auto pool = cache.getPoolManager()
      .createFactory()
      .addServer("localhost", EXAMPLE_SERVER_PORT)
      .create("pool");
}

Then, using that pool, the client creates a region with the same characteristics and name as the server-side region (partition_region).

std::shared_ptr<Region> createRegion(Cache& cache) {
  auto regionFactory = cache.createRegionFactory(RegionShortcut::PROXY);
  auto region = regionFactory.setPoolName("pool").create("partition_region");

  return region;
}

The sample client populates the server’s datastore with values, using the API and some sample key-value pairs.

void populateRegion(const std::shared_ptr<Region>& region) {
  for (int i = 0; i < keys.size(); i++) {
    region->put(keys[i], values[i]);
  }
}

As confirmation that the data has been stored, the sample client uses the API to retrieve the values and write them to the console. This is done without reference to the server-side example function.

std::shared_ptr<CacheableVector> populateArguments() {
  auto arguments = CacheableVector::create();
  for (int i = 0; i < keys.size(); i++) {
    arguments->push_back(CacheableKey::create(keys[i]));
  }
  return arguments;
}

Next, the client retrieves those same values using the server-side example function. The client code creates the input parameter, an array of keys whose values are to be retrieved.

std::vector<std::string> executeFunctionOnServer(const std::shared_ptr<Region> region,
    const std::shared_ptr<CacheableVector> arguments) {
  std::vector<std::string> resultList;

The client creates an execution object using Client.FunctionService.OnRegion and specifying the region.

  auto functionService = FunctionService::onServer(region->getRegionService());

The client then calls the server side function with its input arguments and stores the results in a vector.

  if(auto executeFunctionResult = functionService.withArgs(arguments).execute(getFuncIName)->getResult()) {
    for (auto &arrayList: *executeFunctionResult) {
      for (auto &cachedString: *std::dynamic_pointer_cast<CacheableArrayList>(arrayList)) {
        resultList.push_back(std::dynamic_pointer_cast<CacheableString>(cachedString)->value());
      }
    }
  } else {
    std::cout << "get executeFunctionResult is NULL\n";
  }

  return resultList;
}

It then loops through the results vector and prints the retrieved values.

void printResults(const std::vector<std::string>& resultList) {
  std::cout << "Result count = " << resultList.size() << std::endl << std::endl;
  int i = 0;
  for (auto &cachedString: resultList) {
    std::cout << "\tResult[" << i << "]=" << cachedString << std::endl;
    ++i;
  }