Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions dask_remote_jobqueue/dask_remote_jobqueue.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,14 @@ def __init__(
self,
ssh_namespace="default",
user: str = "NONE",
ssh_url: str = "",
ssh_url_port: int = 8122,
asynchronous: bool = True, # Set by dask-labextension but not used in this class
sitename: str = "",
singularity_wn_image = "/cvmfs/images.dodas.infn.it/registry.hub.docker.com/dodasts/root-in-docker:ubuntu22-kernel-v1",
debug: bool = True,
user_cores: int = 1,
user_memory: str = "2 GiB",
):
self.__debug = debug

Expand Down Expand Up @@ -112,6 +115,7 @@ def __init__(
logger.debug(f"generated -> controller_port: {self.controller_port}")

# Custom ssh port for the tunnel
self.ssh_url: str = ssh_url
self.ssh_url_port: int = ssh_url_port

self.cluster_id: str = ""
Expand Down Expand Up @@ -148,6 +152,10 @@ def __init__(
self.client_id = os.environ.get("IAM_CLIENT_ID")
self.client_secret = os.environ.get("IAM_CLIENT_SECRET")

# Dask worker spec
self.user_cores: int = user_cores
self.user_memory: str = user_memory

##
# Dask labextension variables
#
Expand Down Expand Up @@ -422,6 +430,7 @@ async def _make_connections(self):
self.connection_process_q,
cluster_id=self.cluster_id,
ssh_namespace=self.ssh_namespace,
ssh_url=self.ssh_url,
ssh_url_port=self.ssh_url_port,
username=self.username,
token=self.token,
Expand Down
4 changes: 2 additions & 2 deletions dask_remote_jobqueue/templates/job_rm.sh
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
# This software is released under the MIT License.
# https://opensource.org/licenses/MIT

source /cvmfs/cms.dodas.infn.it/miniconda3/etc/profile.d/conda.sh
conda activate af
#source /cvmfs/cms.dodas.infn.it/miniconda3/etc/profile.d/conda.sh
#conda activate af

export _condor_AUTH_SSL_CLIENT_CAFILE={{ htc_ca }}
export _condor_TOOL_DEBUG={{ htc_debug }}
Expand Down
5 changes: 3 additions & 2 deletions dask_remote_jobqueue/templates/job_submit.sh
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
# This software is released under the MIT License.
# https://opensource.org/licenses/MIT

source /cvmfs/cms.dodas.infn.it/miniconda3/etc/profile.d/conda.sh
conda activate af
#source /cvmfs/cms.dodas.infn.it/miniconda3/etc/profile.d/conda.sh
#conda activate af

export _condor_AUTH_SSL_CLIENT_CAFILE={{ htc_ca }}
export _condor_TOOL_DEBUG={{ htc_debug }}
Expand All @@ -15,4 +15,5 @@ export _condor_SCHEDD_NAME={{ htc_schedd_name }}
export _condor_SCITOKENS_FILE={{ htc_scitoken_file }}
export _condor_SEC_DEFAULT_AUTHENTICATION_METHODS={{ htc_sec_method}}

cat $@
condor_submit -spool $@
17 changes: 9 additions & 8 deletions dask_remote_jobqueue/templates/scheduler.sh
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

chmod +x job_submit.sh
chmod +x job_rm.sh
chmod +x entrypoint.sh
#chmod +x entrypoint.sh

while true; do
curl -d grant_type=urn:ietf:params:oauth:grant-type:token-exchange \
Expand All @@ -20,11 +20,12 @@ while true; do
sleep 72000
done &

source /cvmfs/cms.dodas.infn.it/miniconda3/bin/activate
conda activate af-test
#source /cvmfs/cms.dodas.infn.it/miniconda3/bin/activate
#conda activate af-test

if command -V tini &>/dev/null; then
tini -s python3 -- start_scheduler.py
else
python3 start_scheduler.py
fi
#if command -V tini &>/dev/null; then
# tini -s python3 -- start_scheduler.py
#else
# python3 start_scheduler.py
#fi
python3 start_scheduler.py
5 changes: 3 additions & 2 deletions dask_remote_jobqueue/templates/scheduler.sub
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@ executable = scheduler.sh
log = dask_scheduler.log
output = dask_scheduler.out
error = dask_scheduler.error
+SingularityImage = "/cvmfs/images.dodas.infn.it/registry.hub.docker.com/dodasts/dask-scheduler:v1"
#+SingularityImage = "/cvmfs/unpacked.cern.ch/registry.hub.docker.com/dodasts/dask-scheduler:v1"
+SingularityImage = {{ singularity_wn_image }}
{{ selected_sitename }}
transfer_input_files = start_scheduler.py, scheduler.sh, /ca.crt, /tmp/token, job_submit.sh, job_rm.sh, .bashrc, config.yaml
environment = HOME=./; OIDC_CONFIG_DIR=./.oidc-agent; JHUB_TOKEN={{ token }};JHUB_USER={{ name }};SCHED_PORT={{ sched_port }};DASH_PORT={{ dash_port }};CONTROLLER_PORT={{ controller_port }};REFRESH_TOKEN={{ refresh_token }};IAM_SERVER={{ iam_server }};IAM_CLIENT_ID={{ client_id }};IAM_CLIENT_SECRET={{ client_secret }};_condor_AUTH_SSL_CLIENT_CAFILE={{ htc_ca }};_condor_TOOL_DEBUG={{ htc_debug }};_condor_COLLECTOR_HOST={{ htc_collector }}; _condor_SCHEDD_HOST={{ htc_schedd_host }};_condor_SCHEDD_NAME={{ htc_schedd_name }};_condor_SCITOKENS_FILE={{ htc_scitoken_file }};_condor_SEC_DEFAULT_AUTHENTICATION_METHODS={{ htc_sec_method}};SINGULARITY_WN_IMAGE={{ singularity_wn_image }}
environment = HOME=./; OIDC_CONFIG_DIR=./.oidc-agent; JHUB_TOKEN={{ token }};JHUB_USER={{ name }};SCHED_PORT={{ sched_port }};DASH_PORT={{ dash_port }};CONTROLLER_PORT={{ controller_port }};REFRESH_TOKEN={{ refresh_token }};IAM_SERVER={{ iam_server }};IAM_CLIENT_ID={{ client_id }};IAM_CLIENT_SECRET={{ client_secret }};_condor_AUTH_SSL_CLIENT_CAFILE={{ htc_ca }};_condor_TOOL_DEBUG={{ htc_debug }};_condor_COLLECTOR_HOST={{ htc_collector }}; _condor_SCHEDD_HOST={{ htc_schedd_host }};_condor_SCHEDD_NAME={{ htc_schedd_name }};_condor_SCITOKENS_FILE={{ htc_scitoken_file }};_condor_SEC_DEFAULT_AUTHENTICATION_METHODS={{ htc_sec_method}};SINGULARITY_WN_IMAGE={{ singularity_wn_image }};USER_CORES={{ user_cores }};USER_MEMORY={{ user_memory }}
queue
36 changes: 19 additions & 17 deletions dask_remote_jobqueue/templates/start_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,11 @@ def __init__(self, *args, **kwargs):
**kwargs,
death_timeout=60 * 5, # 5min
python="python3",
#submit_command_extra = "-spool"
)
self.submit_command = "./job_submit.sh"
self.cancel_command = "./job_rm.sh"
self.executable = "/bin/bash"


##
Expand Down Expand Up @@ -81,7 +85,12 @@ def __init__(self, *args, **kwargs):
sched_port = int(os.environ.get("SCHED_PORT", "42000"))
dash_port = int(os.environ.get("DASH_PORT", "42001"))
controller_port = int(os.environ.get("CONTROLLER_PORT", "42002"))
singularity_wn_image = os.environ.get("SINGULARITY_WN_IMAGE", "/cvmfs/images.dodas.infn.it/registry.hub.docker.com/dodasts/root-in-docker:ubuntu22-kernel-v1")
singularity_wn_image = os.environ.get("SINGULARITY_WN_IMAGE", "/cvmfs/unpacked.cern.ch/registry.hub.docker.com/dodasts/root-in-docker:ubuntu22-kernel-v1")

user_cores = int(os.environ.get("USER_CORES", 1))
user_memory = os.environ.get("USER_MEMORY", "2 GiB")
if user_memory == "":
user_memory = "2 GiB"

logger.debug(f"name: {name}")
logger.debug(f"token: {token}")
Expand Down Expand Up @@ -173,13 +182,16 @@ def _worker_spec(self) -> dict:
def run(self):
self.cluster = HTCondorCluster(
job_cls=MyHTCondorJob,
cores=1,
memory="2 GiB", # ref: https://github.com/dask/dask/blob/e4799c0498b5e5877705bb5542d8d01116ee1320/dask/utils.py#L1404
cores = user_cores,
memory = user_memory,
#cores=1,
#memory="2 GiB", # ref: https://github.com/dask/dask/blob/e4799c0498b5e5877705bb5542d8d01116ee1320/dask/utils.py#L1404
disk="1 GB",
scheduler_options=scheduler_options_vars,
job_extra=job_extra_vars,
# silence_logs="debug",
local_directory="./scratch",
job_script_prologue=['eval "$(conda shell.bash hook)"']
)

while self.cluster.status != Status.running:
Expand Down Expand Up @@ -296,7 +308,7 @@ def run(self):
async def tunnel_scheduler():
logger.debug("start tunnel scheduler")
connection = await asyncssh.connect(
"jhub.131.154.96.124.myip.cloud.infn.it",
"jhub.131.154.98.185.myip.cloud.infn.it",
port=31022,
username=name,
password=token,
Expand All @@ -311,7 +323,7 @@ async def tunnel_scheduler():
async def tunnel_dashboard():
logger.debug("start tunnel dashboard")
connection = await asyncssh.connect(
"jhub.131.154.96.124.myip.cloud.infn.it",
"jhub.131.154.98.185.myip.cloud.infn.it",
port=31022,
username=name,
password=token,
Expand All @@ -326,7 +338,7 @@ async def tunnel_dashboard():
async def tunnel_controller():
logger.debug("start tunnel controller")
connection = await asyncssh.connect(
"jhub.131.154.96.124.myip.cloud.infn.it",
"jhub.131.154.98.185.myip.cloud.infn.it",
port=31022,
username=name,
password=token,
Expand Down Expand Up @@ -389,7 +401,7 @@ def get(self):
def prepare(self):
logger.debug(self.request.arguments)


class LogsHandler(tornado.web.RequestHandler):
def initialize(self, sched_q: Queue, controller_q: Queue):
self.sched_q: Queue = sched_q
Expand Down Expand Up @@ -424,22 +436,18 @@ async def get(self):
font-size: 15px;
border-bottom:
}

.active, .collapsible:hover {
background-color: #ec8f72;
}

.content {
padding: 0 18px;
display: none;
overflow: hidden;
background-color: #fafafa;
}

table, th, td {
border: 1px solid black;
}

table {
width: 100%;
}
Expand Down Expand Up @@ -545,7 +553,6 @@ async def get(self):
"""<script>
var coll = document.getElementsByClassName("collapsible");
var i;

for (i = 0; i < coll.length; i++) {
coll[i].addEventListener("click", function() {
this.classList.toggle("active");
Expand All @@ -557,20 +564,16 @@ async def get(self):
}
});
}

window.onscroll = function() {myFunction()};

var header = document.getElementById("myHeader");
var sticky = header.offsetTop;

function myFunction() {
if (window.pageYOffset > sticky) {
header.classList.add("sticky");
} else {
header.classList.remove("sticky");
}
}

var origin_location = window.location.href;
function reload() {
window.location.href = origin_location;
Expand Down Expand Up @@ -666,7 +669,6 @@ def initialize(self, sched_q: Queue, controller_q: Queue):

def get(self):
"""Return a descriptive dictionary of worker specs.

Example worker_spec:
{
"HTCondorCluster-0": {
Expand Down
24 changes: 21 additions & 3 deletions dask_remote_jobqueue/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,10 @@ def __init__(
asyncio.set_event_loop(self.cur_loop)
self.cluster_id: str = cluster_id
self.ssh_namespace: str = ssh_namespace
self.ssh_url: str = f"ssh-listener.{self.ssh_namespace}.svc.cluster.local"
if ssh_url:
self.ssh_url: str = ssh_url
else:
self.ssh_url: str = f"ssh-listener.{self.ssh_namespace}.svc.cluster.local"
self.ssh_url_port: int = ssh_url_port
self.username: str = username
self.token: str = token
Expand Down Expand Up @@ -103,7 +106,7 @@ def run(self):
connected = self._connection_ok(1)
logger.debug(f"[ConnectionManager][attempt: {attempt}][{connected}]")

if attempt >= 42:
if attempt >= 10000:
self.connection_manager_q.put(
f"ERROR - ATTEMPT TO CONNECT EXCEEDED # {attempt}"
)
Expand Down Expand Up @@ -412,6 +415,9 @@ def __init__(
self._htc_scitoken_file: str = ""
self._htc_sec_method: str = ""

self._user_cores: int = 1
self._user_memory: str = "2 GiB"

def _copy_attributes(self):
try:
self._sitename = getattr(self._remoteHTCondor, "sitename")
Expand Down Expand Up @@ -468,6 +474,16 @@ def _copy_attributes(self):
logger.debug(
f"[StartDaskScheduler][copy of htc_sec_method: {self._htc_sec_method}]"
)
self._user_cores = getattr(self._remoteHTCondor, "user_cores")
logger.debug(
f"[StartDaskScheduler][copy of user_cores: {self._user_cores}]"
)
self._user_memory = getattr(self._remoteHTCondor, "user_memory")
logger.debug(
f"[StartDaskScheduler][copy of user_memory: {self._user_memory}]"
)


except AttributeError as exc:
logger.debug(f"[StartDaskScheduler][copy error: {exc}]")
raise
Expand Down Expand Up @@ -516,7 +532,9 @@ def run(self):
htc_scitoken_file=self._htc_scitoken_file,
htc_sec_method=self._htc_sec_method,
selected_sitename=selected_sitename,
singularity_wn_image=self.singularity_wn_image
singularity_wn_image=self.singularity_wn_image,
user_cores=self._user_cores,
user_memory=self._user_memory
)

logger.debug(f"[StartDaskScheduler][run][{dest.name}]")
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
asyncssh
dask==2021.11.1
dask
dask_jobqueue
httpx
jinja2
Expand Down
21 changes: 21 additions & 0 deletions tests/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
from dask_remote_jobqueue import RemoteHTCondor

def main():
cluster = RemoteHTCondor(
user = "ttedesch",
ssh_url = "cms-it-hub.cloud.cnaf.infn.it",
ssh_url_port = 31023,
sitename = "T2_LNL_PD",
singularity_wn_image = "/cvmfs/unpacked.cern.ch/registry.hub.docker.com/dodasts/root-in-docker:ubuntu22-kernel-v1",
asynchronous = False,
debug = False
)

cluster.start()

print(cluster.scheduler_info)

cluster.close()

if __name__=='__main__':
main()
Loading