7.1 C
New York
Sunday, March 22, 2026

Ray: Distributed Computing for All, Half 1


That is the primary in a two-part sequence on distributed computing utilizing Ray. This half reveals the best way to use Ray in your native PC, and half 2 reveals the best way to scale Ray to multi-server clusters within the cloud.

gotten a brand new 16-core laptop computer or desktop, and also you’re keen to check its energy with some heavy computations.

You’re a Python programmer, although not an professional but, so that you open your favorite LLM and ask it one thing like this.

“I wish to rely the variety of prime numbers inside a given enter vary. Please give me some Python code for this.”

After just a few seconds, the LLM offers you some code. You may tweak it a bit by means of a brief back-and-forth, and finally you find yourself with one thing like this:

import math, time, os

def is_prime(n: int) -> bool:
    if n < 2: return False
    if n == 2: return True
    if n % 2 == 0: return False
    r = int(math.isqrt(n)) + 1
    for i in vary(3, r, 2):
        if n % i == 0:
            return False
    return True

def count_primes(a: int, b: int) -> int:
    c = 0
    for n in vary(a, b):
        if is_prime(n):
            c += 1
    return c

if __name__ == "__main__":
    A, B = 10_000_000, 20_000_000
    total_cpus = os.cpu_count() or 1

    # Begin "chunky"; we will sweep this later
    chunks = max(4, total_cpus * 2)
    step = (B - A) // chunks

    print(f"CPUs~{total_cpus}, chunks={chunks}")
    t0 = time.time()
    outcomes = []
    for i in vary(chunks):
        s = A + i * step
        e = s + step if i < chunks - 1 else B
        outcomes.append(count_primes(s, e))
    whole = sum(outcomes)
    print(f"whole={whole}, time={time.time() - t0:.2f}s")

You run this system and it really works completely. The one downside is that it takes fairly a little bit of time to run , possibly thirty to sixty seconds, relying on the dimensions of your enter vary. That’s most likely unacceptable.

What do you do now? You’ve a number of choices, with the three most typical most likely being:
– Parallelise the code utilizing threads or multi-processing
– Rewrite the code in a “quick” language like C or Rust
– Attempt a library like Cython, Numba or NumPy

These are all viable choices, however every has disadvantages. Choices 1 and three considerably improve your code complexity, and the center choice might require you to be taught a brand new programming language.

What if I informed you that there was one other means? One the place the modifications required to your current code could be stored to an absolute minimal. One the place your runtime is robotically unfold throughout all of your accessible cores.

That’s exactly what the third-party Ray library guarantees to do.

What’s Ray?

The Ray Python library is an open-source distributed computing framework designed to make it simple to scale Python packages from a laptop computer to a cluster with minimal code modifications.

Ray makes it easy to scale and distribute compute-intensive software workloads — from deep studying to information processing — throughout clusters of distant computer systems, whereas additionally delivering sensible software runtime enhancements in your laptop computer, desktop, or perhaps a distant cloud-based compute cluster.

Ray offers a wealthy set of libraries and integrations constructed on a versatile distributed execution framework, making distributed computing simple and accessible to all.

Briefly, Ray allows you to parallelise and distribute your Python code with minimal effort, whether or not it’s working regionally on a laptop computer or on an enormous cloud-based cluster.

Utilizing Ray

In the remainder of this text, I’ll take you thru the fundamentals of utilizing Ray to hurry up CPU-intensive Python code, and we’ll arrange some instance code snippets to indicate you ways simple it’s to include the facility of Ray into your personal workloads. 

To get probably the most out of utilizing Ray, if you’re an information scientist or machine studying engineer, there are just a few key ideas it’s essential to perceive first. Ray is made up of a number of elements.

Ray Information is a scalable library designed for information processing in ML and AI duties. It affords versatile, high-performance APIs for AI duties, together with batch inference, information preprocessing, and information ingestion for ML coaching. 

Ray Prepare is a versatile, scalable library designed for distributed machine studying coaching and fine-tuning.

Ray Tune is used for Hyperparameter Tuning.

Ray Serve is a scalable library for deploying fashions to facilitate on-line inference APIs.

Ray RLlib is used for scalable reinforcement studying
As you may see, Ray could be very centered on massive language fashions and AI purposes, however there’s one final vital part I haven’t talked about but, and it’s the one I’ll be utilizing on this article.

Ray Core is designed for scaling CPU-intensive general-purpose Python purposes. It’s designed to unfold your Python workload over all accessible cores on whichever system you’re working it on.

This text might be speaking solely about Ray Core.

Two important ideas to know inside Ray Core are duties and actors. 

Duties are stateless staff or providers applied utilizing Ray by adorning common Python capabilities. 

Actors (or stateful staff) are used, for instance, when it’s essential to preserve observe of and preserve the state of dependent variables throughout your distributed cluster. Actors are applied by adorning common Python courses

Each actors and duties are outlined utilizing the identical @ray.distant decorator. As soon as outlined, these duties are executed with the particular .distant() methodology offered by Ray. We’ll have a look at an instance of this subsequent.

Organising a improvement setting

Earlier than we begin coding, we must always arrange a improvement setting to maintain our tasks siloed in order that they don’t intervene with one another. I’ll be utilizing conda for this, however be happy to make use of whichever device you favor. I’ll be working my code utilizing a Jupyter pocket book in a WSL2 Ubuntu shell on Home windows.

$ conda create -n ray-test python=3.13 -y 
$ conda activate ray-test
(ray-test) $ conda set up ray[default]

Code instance  – counting prime numbers

Let’s revisit the instance I gave at first: counting the variety of primes inside the interval 10,000,000 to twenty,000,000. 

We’ll run our unique Python code and time how lengthy it takes.

import math, time, os

def is_prime(n: int) -> bool:
    if n < 2: return False
    if n == 2: return True
    if n % 2 == 0: return False
    r = int(math.isqrt(n)) + 1
    for i in vary(3, r, 2):
        if n % i == 0:
            return False
    return True

def count_primes(a: int, b: int) -> int:
    c = 0
    for n in vary(a, b):
        if is_prime(n):
            c += 1
    return c

if __name__ == "__main__":
    A, B = 10_000_000, 20_000_000
    total_cpus = os.cpu_count() or 1

    # Begin "chunky"; we will sweep this later
    chunks = max(4, total_cpus * 2)
    step = (B - A) // chunks

    print(f"CPUs~{total_cpus}, chunks={chunks}")
    t0 = time.time()
    outcomes = []
    for i in vary(chunks):
        s = A + i * step
        e = s + step if i < chunks - 1 else B
        outcomes.append(count_primes(s, e))
    whole = sum(outcomes)
    print(f"whole={whole}, time={time.time() - t0:.2f}s")

And the output?

CPUs~32, chunks=64
whole=606028, time=31.17s

Now, can we enhance that utilizing Ray? Sure, by following this straightforward 4-step course of.

Step 1 - Initialise Ray. Add these two traces at the beginning of your code.

import ray

ray.init()

Step 2 - Create our distant perform. That’s simple. Simply beautify the perform we wish to optimise with the @ray.distant decorator. The perform to be embellished is the one which’s performing probably the most work. In our instance, that’s the count_primes perform.

@ray.distant(num_cpus=1)
def count_primes(begin: int, finish: int) -> int:
    ...
    ...

Step 3 - Launch the parallel duties. Name your distant perform utilizing the .distant Ray directive.

refs.append(count_primes.distant(s, e))

Step 4 - Watch for all our duties to finish. Every job in Ray returns an ObjectRef when it’s been known as. It is a promise from Ray. It means Ray has set the duty off working remotely, and Ray will return its worth sooner or later sooner or later. We monitor all of the ObjectRefs returned by working duties utilizing the ray.get() perform. This blocks till all duties have accomplished.

outcomes = ray.get(duties)

Let’s put this all collectively. As you will note, the modifications to our unique code are minimal — simply 4 traces of code added and a print assertion to show the variety of nodes and cores we’re working on.

import math
import time

# -----------------------------------------
# Change No. 1
# -----------------------------------------
import ray
ray.init(auto)

def is_prime(n: int) -> bool:
    if n < 2: return False
    if n == 2: return True
    if n % 2 == 0: return False
    r = int(math.isqrt(n)) + 1
    for i in vary(3, r, 2):
        if n % i == 0:
            return False
    return True

# -----------------------------------------
# Change No. 2
# -----------------------------------------
@ray.distant(num_cpus=1)  # pure-Python loop → 1 CPU per job
def count_primes(a: int, b: int) -> int:
    c = 0
    for n in vary(a, b):
        if is_prime(n):
            c += 1
    return c

if __name__ == "__main__":
    A, B = 10_000_000, 60_000_000
    total_cpus = int(ray.cluster_resources().get("CPU", 1))

    # Begin "chunky"; we will sweep this later
    chunks = max(4, total_cpus * 2)
    step = (B - A) // chunks

    print(f"nodes={len(ray.nodes())}, CPUs~{total_cpus}, chunks={chunks}")
    t0 = time.time()
    refs = []
    for i in vary(chunks):
        s = A + i * step
        e = s + step if i < chunks - 1 else B
        # -----------------------------------------
        # Change No. 3
        # -----------------------------------------
        refs.append(count_primes.distant(s, e))

    # -----------------------------------------
    # Change No. 4
    # -----------------------------------------
    whole = sum(ray.get(refs))

    print(f"whole={whole}, time={time.time() - t0:.2f}s")

Now, has all of it been worthwhile? Let’s run the brand new code and see what we get.

2025-11-01 13:36:30,650 INFO employee.py:2004 -- Began a neighborhood Ray occasion. View the dashboard at 127.0.0.1:8265 
/dwelling/tom/.native/lib/python3.10/site-packages/ray/_private/employee.py:2052: FutureWarning: Tip: In future variations of Ray, Ray will not override accelerator seen units env var if num_gpus=0 or num_gpus=None (default). To allow this conduct and switch off this error message, set RAY_ACCEL_ENV_VAR_OVERRIDE_ON_ZERO=0
  warnings.warn(
nodes=1, CPUs~32, chunks=64
whole=606028, time=3.04s

Nicely, the outcome speaks for itself. The Ray Python code is 10x sooner than the common Python code. Not too shabby.

The place does this improve in velocity come from? Nicely, Ray can unfold your workload to all of the cores in your system. A core is sort of a mini-CPU. After we ran our unique Python code, it used just one core. That’s superb, but when your CPU has multiple core, which most trendy PCs do, then you definately’re leaving cash on the desk, so to talk.

In my case, the CPU has 24 cores, so it’s not stunning that my Ray code was means sooner than the non-Ray code.

Monitoring Ray jobs

One other level price mentioning is that Ray makes it very simple to observe job executions through a dashboard. Discover within the output we acquired when working our Ray instance code, we noticed this,

...  -- Began a neighborhood Ray occasion. View the dashboard at 127.0.0.1:8265

It’s displaying a neighborhood URL hyperlink as a result of I’m working this on my desktop. In the event you had been working this on a cluster, the URL would level to a location on the cluster head node.

Whenever you click on on the given URL hyperlink, you must see one thing much like this,

Picture by Creator

From this predominant display, you may drill down to observe many facets of your Ray packages utilizing the menu hyperlinks on the prime of the web page.

Utilizing Ray actors

I beforehand talked about that actors had been an integral a part of the Ray core processing. Actors are used to coordinate and share information between Ray duties. For instance, say you wish to set a worldwide restrict for ALL working duties that they need to adhere to. Let’s say you could have a pool of employee duties, however you wish to be sure that solely a most of 5 of these duties can run concurrently. Right here is a few code you may suppose would work.

import math, time, os

def is_prime(n: int) -> bool:
    if n < 2: return False
    if n == 2: return True
    if n % 2 == 0: return False
    r = int(math.isqrt(n)) + 1
    for i in vary(3, r, 2):
        if n % i == 0:
            return False
    return True

def count_primes(a: int, b: int) -> int:
    c = 0
    for n in vary(a, b):
        if is_prime(n):
            c += 1
    return c

if __name__ == "__main__":
    A, B = 10_000_000, 20_000_000
    total_cpus = os.cpu_count() or 1

    # Begin "chunky"; we will sweep this later
    chunks = max(4, total_cpus * 2)
    step = (B - A) // chunks

    print(f"CPUs~{total_cpus}, chunks={chunks}")
    t0 = time.time()
    outcomes = []
    for i in vary(chunks):
        s = A + i * step
        e = s + step if i < chunks - 1 else B
        outcomes.append(count_primes(s, e))
    whole = sum(outcomes)
    print(f"whole={whole}, time={time.time() - t0:.2f}s")

We now have used a worldwide variable to restrict the variety of working duties, and the code is syntactically appropriate, working with out error. Sadly, you gained’t get the outcome you anticipated. That’s as a result of every Ray job runs in its personal course of area and has its personal copy of the worldwide variable. The worldwide variable is NOT shared between capabilities. So after we run the above code, we’ll see output like this,

Complete calls: 200
Meant GLOBAL_QPS: 5.0
Anticipated time if actually global-limited: ~40.00s
Precise time with 'international var' (damaged): 3.80s
Noticed cluster QPS: ~52.6 (ought to have been ~5.0)

To repair this, we use an actor. Recall that an actor is only a Ray-decorated Python class. Right here is the code with actors.

import time, ray

ray.init(ignore_reinit_error=True, log_to_driver=False)

# That is our actor
@ray.distant
class GlobalPacer:
    """Serialize calls so cluster-wide fee <= qps."""
    def __init__(self, qps: float):
        self.interval = 1.0 / qps
        self.next_time = time.time()

    def purchase(self):
        # Wait contained in the actor till we will proceed
        now = time.time()
        if now < self.next_time:
            time.sleep(self.next_time - now)
        # Reserve the subsequent slot; guard in opposition to drift
        self.next_time = max(self.next_time + self.interval, time.time())
        return True

@ray.distant
def call_api_with_limit(n_calls: int, pacer):
    performed = 0
    for _ in vary(n_calls):
        # Watch for international permission
        ray.get(pacer.purchase.distant())
        # fake API name (no additional sleep right here)
        performed += 1
    return performed

if __name__ == "__main__":
    NUM_WORKERS = 10
    CALLS_EACH  = 20
    GLOBAL_QPS  = 5.0  # cluster-wide cap

    total_calls = NUM_WORKERS * CALLS_EACH
    expected_min_time = total_calls / GLOBAL_QPS

    pacer = GlobalPacer.distant(GLOBAL_QPS)

    t0 = time.time()
    ray.get([call_api_with_limit.remote(CALLS_EACH, pacer) for _ in range(NUM_WORKERS)])
    dt = time.time() - t0

    print(f"Complete calls: {total_calls}")
    print(f"International QPS cap: {GLOBAL_QPS}")
    print(f"Anticipated time (if capped at {GLOBAL_QPS} QPS): ~{expected_min_time:.2f}s")
    print(f"Precise time with actor: {dt:.2f}s")
    print(f"Noticed cluster QPS: ~{total_calls/dt:.1f}")

Our limiter code is encapsulated in a category (GlobalPacer) and embellished with ray.distant, that means it applies to all working duties. We are able to see the distinction this makes to the output by working the up to date code.

Complete calls: 200
International QPS cap: 5.0
Anticipated time (if capped at 5.0 QPS): ~40.00s
Precise time with actor: 39.86s
Noticed cluster QPS: ~5.0

Abstract

This text launched Ray, an open-source Python framework that makes it simple to scale compute-intensive packages from a single core to a number of cores or perhaps a cluster with minimal code modifications. 

I briefly talked about the important thing elements of Ray—Ray Information, Ray Prepare, Ray Tune, Ray Serve, and Ray Core—emphasising that Ray Core is right for general-purpose CPU scaling. 

I defined among the important ideas in Ray Core, reminiscent of its introduction of duties (stateless parallel capabilities), actors (stateful staff for shared state and coordination), and ObjectRefs (a future promise of a job’s return worth)

To showcase the benefits of utilizing Ray, I started with a easy CPU-intensive instance — counting prime numbers over a spread — and confirmed how working it on a single core will be sluggish with a naive Python implementation.

As an alternative of rewriting the code in one other language or utilizing complicated multiprocessing libraries, Ray lets you parallelise the workload in simply 4 easy steps and only a few additional traces of code:

  • ray.init() to start out Ray
  • Adorn your capabilities with @ray.distant to show them into parallel duties
  • .distant() to launch duties concurrently, and
  • ray.get() to gather job outcomes.

This strategy lower the runtime of the prime-counting instance from ~30 seconds to ~3 seconds on a 24-core machine.

I additionally talked about how simple it’s to observe working jobs in Ray utilizing its built-in dashboard and confirmed the best way to entry it. 

Lastly, I offered an instance of utilizing a Ray Actor by displaying why international variables are usually not appropriate for coordinating throughout duties, since every employee has its personal reminiscence area.

Within the second a part of this sequence, we’ll see the best way to take issues to a different stage by enabling Ray jobs to make use of much more CPU energy as we scale to massive multi-node servers within the cloud through Amazon Net Companies.



Related Articles

Latest Articles