Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement Postgres locking #1651

Merged
merged 16 commits into from
Sep 5, 2024
Merged
4 changes: 3 additions & 1 deletion contributing/AUTOSCALING.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
dstack features auto-scaling for services published via the gateway. The general flow is:
# Autoscaling

`dstack` features auto-scaling for services published via the gateway. The general flow is:

- STEP 1: `dstack-gateway` parses nginx `access.log` to collect per-second statistics about requests to the service and request times.
- STEP 2: `dstack-gateway` aggregates statistics over a 1-minute window.
Expand Down
8 changes: 5 additions & 3 deletions contributing/GATEWAY.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
A dstack gateway is a dedicated instance responsible for publishing user applications to the outer internet via the HTTP protocol. One dstack gateway can serve many services, domains, or projects.
# Gateway

A `dstack` gateway is a dedicated instance responsible for publishing user applications to the outer internet via the HTTP protocol. One dstack gateway can serve many services, domains, or projects.

## Gateway creation

Gateways are managed by the dstack server. A gateway is associated with a project and some backend in the project. Users must attach a wildcard domain to the gateway, i.e., all direct subdomains should resolve to the gateway IP address. Since the IP address is unknown during provisioning, dstack doesn't check DNS records.
Gateways are managed by the `dstack` server. A gateway is associated with a project and some backend in the project. Users must attach a wildcard domain to the gateway, i.e., all direct subdomains should resolve to the gateway IP address. Since the IP address is unknown during provisioning, `dstack` doesn't check DNS records.

Provisioning happens as follows:
1. Launch a non-GPU instance (usually the smallest) with all ports exposed.
Expand All @@ -25,7 +27,7 @@ The `dstack-gateway` server dumps its internal state to the file `~/dstack/state

## Connection between server and gateway

The dstack server keeps a bidirectional tunnel with each GatewayCompute for the whole uptime of the server.
The `dstack` server keeps a bidirectional tunnel with each GatewayCompute for the whole uptime of the server.

- The tunnel from the server to the gateway is used to manage the gateway: register and unregister services and replicas.
- The tunnel from the gateway to the server is used to authenticate requests to the gateway based on dstack's tokens.
Expand Down
2 changes: 2 additions & 0 deletions contributing/GPUHUNT.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
# gpuhunt

[`dstackai/gpuhunt`](https://github.com/dstackai/gpuhunt) is a library developed and used for dstack. It implements the unified interface for fetching offers and prices from different cloud providers.

An offer is a possible configuration. It consists of:
Expand Down
88 changes: 88 additions & 0 deletions contributing/LOCKING.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
# Locking

The `dstack` server supports SQLite and Postgres databases
with two implementations of resource locking to handle concurrent access:

* In-memory locking for SQLite.
* DB-level locking for Postgres.

## SQLite locking

SQLite is missing efficient mechanisms to handle concurrent writes (e.g. select for update),
so `dstack` implements in-memory resource-level locking.
In-memory locking works correctly under the assumption that there is only one server instance (process),
which is a `dstack` limitation when using SQLite.

The in-memory locking is implemented via locksets.
Locksets are Python sets that store IDs of locked resources.
Concurrent access to locksets is guarded with asyncio locks:

```python
lock, locket = get_lockset("my_table")
r4victor marked this conversation as resolved.
Show resolved Hide resolved
async with lock:
# select resource that is not in lockset
lockset.add(resource.id)
try:
process_resource(resource)
finally:
lockset.remove(resource.id)
```

Locksets are an optimization. One can think of them as per-resource-id locks that
allow independent locking of different resources.

## Postgres locking

Postgres resource locking is implemented via standard SELECT FOR UPDATE.
SQLAlchemy provides `.with_for_update()` that has not effect if SELECT FOR UPDATE is not supported as in SQLite.

There are few places that rely on advisory locks as when generating unique resource names.

## Working with locks

Concurrency is hard. Below you'll find common patterns and gotchas when working with locks to make it a bit more manageable.

**A task should acquire locks on resources it modifies**

This is a common sense approach. An alternative could be the inverse: job processing cannot run in parallel with run processing, so job processing takes run lock. This indirection complicates things and is discouraged.
r4victor marked this conversation as resolved.
Show resolved Hide resolved


**Start new transaction after acquiring a lock to see other transactions changes in SQLite.**

```python
select resource ids by names
lock resource ids
await session.commit()
# The next statement will start new transaction
select ...
```

> SQLite exhibits "snapshot isolation". When a read transaction starts, that reader continues to see an unchanging "snapshot" of the database file as it existed at the moment in time when the read transaction started. Any write transactions that commit while the read transaction is active are still invisible to the read transaction, because the reader is seeing a snapshot of database file from a prior moment in time. Source: https://www.sqlite.org/isolation.html

Thus, if a new transaction is not started, you won't see changes that concurrent transactions made before you acquired the lock.

This is not relevant for Postgres since it doesn't rely on in-memory locking (and it also runs on Read Committed isolation level by default).

**Release in-memory locks only after committing changes**

```python
# Don't do this!
lock resources
unlock resources
do smth else
await session.commit()
```

```python
# Do this!
lock resources
await session.commit()
unlock resources
```

If a transaction releases a lock before committing changes,
the changes may not be visible to another transaction that acquired the lock and relies upon seeing all committed changes.

**Don't use joinedload when selecting .with_for_update()**

In fact, using `joinedload` and `.with_for_update()` will trigger and error because `joinedload` produces OUTER LEFT JOIN that cannot be used with SELECT FOR UPDATE. A regular `.join()` can be used to lock related resources but it may lead to no rows if there is no row to join. Usually, you'd select with `selectinload` or first select with `.with_for_update()` without loading related attributes and then re-selecting with `joinedload` without `.with_for_update()`.
18 changes: 10 additions & 8 deletions contributing/RUNNER-AND-SHIM.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
dstack runs the user's configuration as a Docker container. The user can specify their own image name or use the preconfigured dstack image (with Python and CUDA).
# runner and shim

`dstack-runner` is a component responsible for setting environment variables and secrets, executing user commands, reporting logs and job status, and terminating the job on signal from the dstack server. `dstack-runner` is cloud-agnostic and runs as an entrypoint of a Docker container.
`dstack` runs the user's configuration as a Docker container. The user can specify their own image name or use the preconfigured dstack image (with Python and CUDA).

If the cloud provider has VM capabilities, dstack runs `dstack-shim` on the host to emulate a container-only environment. `dstack-shim` is responsible for pulling Docker images (public or private), configuring Docker containers (mounts, GPU forwarding, entrypoint, etc.), running Docker containers, and terminating the container on signal from the dstack server.
`dstack-runner` is a component responsible for setting environment variables and secrets, executing user commands, reporting logs and job status, and terminating the job on signal from the `dstack` server. `dstack-runner` is cloud-agnostic and runs as an entrypoint of a Docker container.

If the cloud provider has VM capabilities, `dstack` runs `dstack-shim` on the host to emulate a container-only environment. `dstack-shim` is responsible for pulling Docker images (public or private), configuring Docker containers (mounts, GPU forwarding, entrypoint, etc.), running Docker containers, and terminating the container on signal from the `dstack` server.

## dstack-shim

Expand All @@ -15,7 +17,7 @@ If the cloud provider has VM capabilities, dstack runs `dstack-shim` on the host
- Or wait for the interruption signal from the dstack server
- STEP 4: Go to STEP 1

All communication between the dstack server and `dstack-shim` happens via REST API through an SSH tunnel. `dstack-shim` doesn't collect logs. Usually, it is run from a `cloud-init` user-data script.
All communication between the `dstack` server and `dstack-shim` happens via REST API through an SSH tunnel. `dstack-shim` doesn't collect logs. Usually, it is run from a `cloud-init` user-data script.
The entrypoint for the container:
- Installs `openssh-server`
- Adds project and user public keys to `~/.ssh/authorized_keys`
Expand All @@ -31,17 +33,17 @@ The entrypoint for the container:
- STEP 3: Prepare the repo (clone git repo and apply the diff, or extract the archive)
- STEP 4: Run the commands from the job spec
- Wait for the commands to exit
- Serve logs to the dstack server via HTTP
- Serve logs to the `dstack` server via HTTP
- Serve real-time logs to the CLI via WebSocket
- Wait for the signal to terminate the commands
- STEP 5: Wait until all logs are read by the server and the CLI. Or exit after a timeout

All communication between the dstack server and `dstack-runner` happens via REST API through an SSH tunnel. `dstack-runner` collects the job logs and its own logs. Only the job logs are served via WebSocket.
All communication between the `dstack` server and `dstack-runner` happens via REST API through an SSH tunnel. `dstack-runner` collects the job logs and its own logs. Only the job logs are served via WebSocket.

## SSH tunnels

dstack expects a running SSH server right next to the `dstack-runner`. It provides a secure channel for communication with the runner API and forwarding any ports without listening for `0.0.0.0`. The `dstack-gateway` also uses this SSH server for forwarding requests from public endpoints.
`dstack` expects a running SSH server right next to the `dstack-runner`. It provides a secure channel for communication with the runner API and forwarding any ports without listening for `0.0.0.0`. The `dstack-gateway` also uses this SSH server for forwarding requests from public endpoints.

`dstack-shim` must also be running next to the SSH server. The dstack server connects to this SSH server for interacting with both `dstack-shim` and `dstack-runner` since we use `host` networking mode for the Docker container. The CLI uses this SSH server as a jump host because the user wants to connect to the container.
`dstack-shim` must also be running next to the SSH server. The `dstack` server connects to this SSH server for interacting with both `dstack-shim` and `dstack-runner` since we use `host` networking mode for the Docker container. The CLI uses this SSH server as a jump host because the user wants to connect to the container.

> `host` networking mode would allow jobs to use any port at any moment for internal communication. For example, during distributed PyTorch training.
23 changes: 4 additions & 19 deletions contributing/RUNS-AND-JOBS.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
# Runs and jobs

## Introduction

Run is the primary unit of workload in dstack. Users can:
Run is the primary unit of workload in `dstack`. Users can:
1. Submit a run using `dstack run` or the API.
2. Stop a run using `dstack stop` or the API.

Expand Down Expand Up @@ -39,7 +41,7 @@ Services' lifecycle has some modifications:
### When can the job be retried?
It's a complicated question and will be elaborated later with multi-node and replica implementation.

For now, dstack retries only if:
For now, `dstack` retries only if:
- The configuration has enabled the retry policy.
- The job failed because of `NO_CAPACITY`, and the instance was a spot.

Expand Down Expand Up @@ -70,20 +72,3 @@ Services' jobs lifecycle has some modifications:

## Stop a Run
To stop a run, `services.runs.stop_runs` assigns `TERMINATING` status to the run and executes one iteration of the processing without waiting for the background task.

## Concurrency
Since SQLite lacks per-row locking, we use an in-memory locking mechanism to avoid race conditions.

Every lock consists of a lock primitive (`asyncio.Lock`) and a set of locked IDs (`Set[uuid.UUID]`). It follows the rules below:
- Only the `asyncio.Lock` holder can add an ID to the set.
- The processing task must remove the corresponding ID from the set; acquiring `asyncio.Lock` is not required.

Runs and jobs are processed by concurrent background tasks. There are locks for runs and jobs:
- `PROCESSING_RUNS_(LOCK|IDS)`
- `SUBMITTED_PROCESSING_JOBS_(LOCK|IDS)`
- `RUNNING_PROCESSING_JOBS_(LOCK|IDS)`
- `TERMINATING_PROCESSING_JOBS_(LOCK|IDS)`

Run processing takes priority over job processing; that's why:
- Once `run.id` is in `PROCESSING_RUNS_IDS`, any job processing task should not take any job with `job.run_id` in `PROCESSING_RUNS_IDS`.
- Any run processing task should wait until job processing tasks release all run's job IDs from `*_PROCESSING_JOBS_IDS` sets.
1 change: 1 addition & 0 deletions src/dstack/_internal/cli/commands/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,4 +65,5 @@ def _command(self, args: Namespace):
port=args.port,
reload=version.__version__ is None,
log_level=uvicorn_log_level,
workers=1,
)
25 changes: 18 additions & 7 deletions src/dstack/_internal/server/background/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,25 @@ def get_scheduler() -> AsyncIOScheduler:


def start_background_tasks() -> AsyncIOScheduler:
_scheduler.add_job(process_submitted_jobs, IntervalTrigger(seconds=2))
_scheduler.add_job(process_running_jobs, IntervalTrigger(seconds=2))
_scheduler.add_job(process_terminating_jobs, IntervalTrigger(seconds=2))
_scheduler.add_job(process_instances, IntervalTrigger(seconds=10))
_scheduler.add_job(process_runs, IntervalTrigger(seconds=1))
# In-memory locking via locksets does not guarantee
# that the first waiting for the lock will acquire it.
# The jitter is needed to give all tasks a chance to acquire locks.
_scheduler.add_job(
process_submitted_jobs, IntervalTrigger(seconds=4, jitter=2), max_instances=5
)
_scheduler.add_job(process_running_jobs, IntervalTrigger(seconds=4, jitter=2), max_instances=5)
_scheduler.add_job(
process_terminating_jobs, IntervalTrigger(seconds=4, jitter=2), max_instances=5
)
_scheduler.add_job(process_instances, IntervalTrigger(seconds=4, jitter=2), max_instances=5)
_scheduler.add_job(process_runs, IntervalTrigger(seconds=2), max_instances=5)
_scheduler.add_job(process_gateways_connections, IntervalTrigger(seconds=15))
_scheduler.add_job(process_submitted_gateways, IntervalTrigger(seconds=10), max_instances=5)
_scheduler.add_job(process_submitted_volumes, IntervalTrigger(seconds=5))
_scheduler.add_job(
process_submitted_gateways, IntervalTrigger(seconds=10, jitter=2), max_instances=5
)
_scheduler.add_job(
process_submitted_volumes, IntervalTrigger(seconds=10, jitter=2), max_instances=5
)
_scheduler.add_job(process_fleets, IntervalTrigger(seconds=15))
_scheduler.start()
return _scheduler
54 changes: 24 additions & 30 deletions src/dstack/_internal/server/background/tasks/process_fleets.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
from uuid import UUID

from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import joinedload
Expand All @@ -8,57 +6,55 @@
from dstack._internal.server.db import get_session_ctx
from dstack._internal.server.models import FleetModel
from dstack._internal.server.services.fleets import (
PROCESSING_FLEETS_IDS,
PROCESSING_FLEETS_LOCK,
fleet_model_to_fleet,
is_fleet_empty,
is_fleet_in_use,
)
from dstack._internal.server.services.locking import get_locker
from dstack._internal.utils.common import get_current_datetime
from dstack._internal.utils.logging import get_logger

logger = get_logger(__name__)


async def process_fleets():
lock, lockset = get_locker().get_lockset(FleetModel.__tablename__)
async with get_session_ctx() as session:
async with PROCESSING_FLEETS_LOCK:
async with lock:
res = await session.execute(
select(FleetModel)
.where(
FleetModel.deleted == False,
FleetModel.id.not_in(PROCESSING_FLEETS_IDS),
FleetModel.id.not_in(lockset),
)
.order_by(FleetModel.last_processed_at.asc())
.limit(1)
.with_for_update(skip_locked=True)
)
fleet_model = res.scalar()
if fleet_model is None:
return

PROCESSING_FLEETS_IDS.add(fleet_model.id)

try:
fleet_model_id = fleet_model.id
await _process_fleet(fleet_id=fleet_model_id)
finally:
PROCESSING_FLEETS_IDS.remove(fleet_model_id)
lockset.add(fleet_model.id)
try:
fleet_model_id = fleet_model.id
await _process_fleet(session=session, fleet_model=fleet_model)
finally:
lockset.difference_update([fleet_model_id])


async def _process_fleet(fleet_id: UUID):
async with get_session_ctx() as session:
res = await session.execute(
select(FleetModel)
.where(FleetModel.id == fleet_id)
.options(joinedload(FleetModel.project))
.options(joinedload(FleetModel.instances))
.options(joinedload(FleetModel.runs))
)
fleet_model = res.unique().scalar_one()
await _autodelete_fleet(
session=session,
fleet_model=fleet_model,
)
async def _process_fleet(session: AsyncSession, fleet_model: FleetModel):
# Refetch to load related attributes.
# joinedload produces LEFT OUTER JOIN that can't be used with FOR UPDATE.
res = await session.execute(
select(FleetModel)
.where(FleetModel.id == fleet_model.id)
.options(joinedload(FleetModel.project))
.options(joinedload(FleetModel.instances))
.options(joinedload(FleetModel.runs))
.execution_options(populate_existing=True)
)
fleet_model = res.unique().scalar_one()
await _autodelete_fleet(session=session, fleet_model=fleet_model)


async def _autodelete_fleet(session: AsyncSession, fleet_model: FleetModel):
Expand All @@ -74,10 +70,8 @@ async def _autodelete_fleet(session: AsyncSession, fleet_model: FleetModel):
return

logger.info("Automatic cleanup of an empty fleet %s", fleet_model.name)

fleet_model.status = FleetStatus.TERMINATED
fleet_model.deleted = True
fleet_model.last_processed_at = get_current_datetime()
await session.commit()

logger.info("Fleet %s deleted", fleet_model.name)
Loading