Skip to content

Adds support for multiple managers running distributed fate#6168

Open
keith-turner wants to merge 15 commits intoapache:mainfrom
keith-turner:dist-fate3
Open

Adds support for multiple managers running distributed fate#6168
keith-turner wants to merge 15 commits intoapache:mainfrom
keith-turner:dist-fate3

Conversation

@keith-turner
Copy link
Contributor

Lays the foundation for multiple manager with the following changes. The best place to start looking at these changes is in the Manager.run() method which sets everything and ties it all together.

  • Each manager process acquires two zookeeper locks now, a primary lock and an assistant lock. Only one manager process can obtain the primary lock and when it does it assumes the role of primary manager.  All manager processes acquire an assistant lock, which is similar to a tserver or compactor lock.  The assistant lock advertises the manager process as being available to other Accumulo processes to handle assistant manager operations.
  • Manager processes have a single thrift server and thrift services hosted on that thrift server are categorized into primary manager and assistant manager services. When an assistant manager receives an RPC for a primary manager thrift service it will not execute the request and will throw an error or ignore the request.
  • The primary manager process delegates manager responsibility via RPCs to assistant managers.
  • Any management responsibility not delegated runs on the primary manager.

Using the changes above fate is now distributed across all manager processes. In the future the changes above should make it easy to delegate other responsibilities to assistant managers. The following is an outline of the fate changes.

  • New FateWorker class. This runs in every manager and handles request from the primary manager to adjust what range of the fate table its currently responsible for. FateWorker implements a new thrift service used to assign it ranges.
  • New FateManager class that is run by the primary manager and is responsible for partitioning fate processing across all assistant managers. As manager processes come and go this will repartition the fate table evenly across all available managers. The FateManager communicates with FateWorkers via thrift.
  • Some new RPCs for best effort notifications. Before these changes there were in memory notification systems that made the manager more responsive. These would allow a fate operation to signal the Tablet Group Watcher to take action sooner. FateWorkerEnv sends these notifications to the primary manger over a new RPC. Does not matter if they are lost, things will still eventually happen.

Other than fate, the primary manager process does everything the current manager does. This change pulls from #3262 and #6139.

Lays the foundation for multiple manager with the following changes.
The best place to start looking at these changes is in the Manager.run()
method which sets everything and ties it all together.

 * Each manager process acquires two zookeeper locks now, a primary lock
   and an assistant lock. Only one manager process can obtain the
   primary lock and when it does it assumes the role of primary manager. 
   All manager processes acquire an assistant lock, which is similar to a
   tserver or compactor lock.  The assistant lock advertises the manager
   process as being available to other Accumulo processes to handle
   assistant manager operations.
 * Manager processes have a single thrift server and thrift services
   hosted on that thrift server are categorized into primary manager and
   assistant manager services. When an assistant manager receives an RPC
   for a primary manager thrift service it will not execute the request and
   will throw an error or ignore the request.
 * The primary manager process delegates manager responsibility via RPCs
   to assistant managers.
 * Any management responsibility not delegated runs on the primary manager.

Using the changes above fate is now distributed across all manager
processes. In the future the changes above should make it easy to
delegate other responsibilities to assistant managers. The following
is an outline of the fate changes.

  * New FateWorker class.  This runs in every manager and handles
    request from the primary manager to adjust what range of the fate
    table its currently responsible for. FateWorker implements a new
    thrift service used to assign it ranges.
  * New FateManager class that is run by the primary manager and is
    responsible for partitioning fate processing across all assistant
    managers. As manager processes come and go this will repartition the
    fate table evenly across all available managers. The FateManager
    communicates with FateWorkers via thrift.
  * Some new RPCs for best effort notifications. Before these changes
    there were in memory notification systems that made the manager more
    responsive.  These would allow a fate operation to signal the Tablet
    Group Watcher to take action sooner.  FateWorkerEnv sends these
    notifications to the primary manger over a new RPC.  Does not matter if
    they are lost, things will still eventually happen.

Other than fate, the primary manager process does everything the
current manager does.  This change pulls from apache#3262 and apache#6139.

Co-authored-by: Dave Marion <dlmarion@apache.org>
@keith-turner
Copy link
Contributor Author

Worked up a design document at https://cwiki.apache.org/confluence/display/ACCUMULO/Multiple+Managers+Foundation

Pulled most of the text from that into the commit message.

Copy link
Contributor

@dlmarion dlmarion left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First pass just looking at multiple manager changes, not looking at fate changes.


metricsInfo.init(MetricsInfo.serviceTags(getContext().getInstanceName(), getApplicationName(),
getAdvertiseAddress(), getResourceGroup()));

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to wait here for some minimum set of Managers before proceeding like we do for TabletServers? Wondering if it might reduce some churn in the FateManager at startup. I have the code for this already in #3262 in Manager at line 1057.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That seems like a good change, could be done in a follow on PR

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Opened #6186

metricsInfo
.addMetricsProducers(fateWorker.getMetricsProducers().toArray(new MetricsProducer[0]));

metricsInfo.init(MetricsInfo.serviceTags(getContext().getInstanceName(), getApplicationName(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Something to address later, how can we add a tag to the metrics to denote whether this Manager is primary or not.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In addition to that, we probably need some command line tools to show information about manager processes. Maybe that could be an update to the service status command. Would be nice to see which manager is primary and for the non primary ones what has been delegated to them. Could also show this information on the monitor.

@keith-turner
Copy link
Contributor Author

keith-turner commented Mar 5, 2026

Did some testing on a single machine using uno. Found I can start multiple managers with the following command.

ACCUMULO_CLUSTER_ARG=5 accumulo-service manager start

Found one bug during this testing where the fate operation that commits a compaction was not generating a notification. Fixed that in 365b833. Other than that, the testing went well. Running a user compaction from the shell for a table with a single tablet takes 2 seconds. Seems the most time is spent waiting for the compactor to pick up a job. For this simple test saw the following in the logs.

  1. Seeded a fate operation to drive the user compaction. Saw in the logs it notified the remote manager assigned that range of the fate table.
  2. The fate operation started because of the notification and and wrote to the metadata table that the tablet needs compaction. Saw in the logs it notified the TGW via a RPC to the primary manager
  3. The TGW ran because of the notification and saw the metadata entrry and queued a compaction job
  4. Compactor eventually ran the job and seeded a fate operation to commit it. Saw in the logs it notified the remote manager assigned that range of the fate table.
  5. The fate operation to commit ran because of the notification
  6. The fate operation driving the user compaction finished.

return get(Constants.ZCOMPACTORS, resourceGroupPredicate, address, withLock);
}

public Set<ServiceLockPath> getManagerAssistants(ResourceGroupPredicate resourceGroupPredicate,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the method above the name is 'AssistantManager' and here it is ManagerAssistants. I think we should be consistent in the naming.

Is it the case that clients will only always connect to the primary manager, so that is why it's called Manager?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Improved the naming consistency in 0556afa

Is it the case that clients will only always connect to the primary manager, so that is why it's called Manager?

Currently clients only talk to the primary manager. In a follow on would like to create a new thrift service, maybe called AssistantManagerClientService, that is not wrapped with HighlyAvailableService and move some things from the current manager client service to it. Could also explore making where the fate thrift client service could be called on any manager. For this change would need to make client side changes similar to what was done in #3262

Speaking of naming, I am going to rename HighlyAvailableService to PrimaryManagerService or something like that. Its naming does not match its purpose very well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Opened #6183

return get(Constants.ZCOMPACTORS, resourceGroupPredicate, address, withLock);
}

public Set<ServiceLockPath> getManagerAssistants(ResourceGroupPredicate resourceGroupPredicate,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should be able to remove the resource group predicate here and use the DEFAULT_RG_ONLY predicate. Or, maybe this method can be removed in favor of the one below.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed in 0556afa


// Start two more managers
getCluster().exec(Manager.class);
getCluster().exec(Manager.class);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this wait until all are up and reporting in ZK before continuing?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thats a good change, made it in 0556afa

return Math.max(1, deadline - System.currentTimeMillis());
}

private void getManagerLock() throws KeeperException, InterruptedException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
private void getManagerLock() throws KeeperException, InterruptedException {
private void getManagerAssistantLock() throws KeeperException, InterruptedException {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed in 0556afa

var fateCleaner = new FateCleaner<>(store, Duration.ofHours(8), this::getSteadyTime);
ThreadPools.watchCriticalScheduledTask(context.getScheduledExecutor()
.scheduleWithFixedDelay(fateCleaner::ageOff, 10, 4 * 60, MINUTES));
managerMetrics.configureFateMetrics(getConfiguration(), this);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be useful to comment in the run method that this method must be called before registering the managerMetrics producer for maintenance purposes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made a different change w/ the same goal in d74a34c

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bumped into a few problems w/ the organization of the manager metrics code when making some of these changes. Opened #6181 to improve it. The comment about setup order is one of those problems and that comes from the setup code being spread all over the place.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also created #6182 because I noticed something metrics related while reviewing this.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants