Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fig 8a and 9 #18

Merged
merged 9 commits into from
Jun 3, 2024
Merged

Fig 8a and 9 #18

merged 9 commits into from
Jun 3, 2024

Conversation

rodrigo-castellon
Copy link
Owner

Aiming for replication of Figure 8a and 9.

Fixes so far:

  • increase max gRPC message size to 1GB.
  • improve logging to be millisecond-precision timestamps
  • calculate transfer time correctly (fix bandwidth calc. to division)
  • fix numQueuedTasks = 1 when there is nothing queued but running task queue is full
  • set default bandwidth to a non-zero value
  • refactor LOBS so that all LocationFound() does is fill the channel. all parsing done on Get() side
    • related: implement no-copy get() so that we can just block on something instead of having to actually copy
  • allow client to specify where to send a task
    • to make experimentation easier

@rodrigo-castellon
Copy link
Owner Author

rodrigo-castellon commented May 31, 2024

worker1_1           | 2024/05/31 05:07:30.828 [lobs] lobs server listening at [::]:50000
worker1_1           | 2024/05/31 05:07:30.828 [worker] worker server listening at [::]:50002
global_scheduler_1  | 2024/05/31 05:07:30.516 server listening at [::]:50000
worker1_1           | 2024/05/31 05:07:30.830 [localscheduler] localsched server listening at [::]:50001
worker2_1           | 2024/05/31 05:07:30.558 [lobs] lobs server listening at [::]:50000
gcs_1               | 2024/05/31 05:07:30.513 server listening at [::]:50001
worker3_1           | 2024/05/31 05:07:30.478 [worker] worker server listening at [::]:50002
worker3_1           | 2024/05/31 05:07:30.480 [lobs] lobs server listening at [::]:50000
worker2_1           | 2024/05/31 05:07:30.559 [worker] worker server listening at [::]:50002
gcs_1               | 2024/05/31 05:07:30 server listening at [::]:50000
worker3_1           | 2024/05/31 05:07:30.483 [localscheduler] localsched server listening at [::]:50001
worker2_1           | 2024/05/31 05:07:30.564 [localscheduler] localsched server listening at [::]:50001
worker1_1           | 2024/05/31 05:07:32.895 [worker] in Run() rn
worker1_1           | 2024/05/31 05:07:32.895 [worker] ....fetching?
worker1_1           | 2024/05/31 05:07:32.896 [worker] the err was <nil>
worker1_1           | 2024/05/31 05:07:32.896 [worker] gonna execute now
worker1_1           | 2024/05/31 05:07:32.896 [worker] Time to prepare command: 8.459µs
worker1_1           | 2024/05/31 05:07:32.896 [worker] Time to encode inputs: 958ns
worker1_1           | 2024/05/31 05:07:32.896 [worker] Time to write to buffer: 500ns
worker1_1           | 2024/05/31 05:07:32.897 [lobs] IN GET() RN!
worker1_1           | 2024/05/31 05:07:32.952 [worker] Time to execute command: 55.499666ms
worker1_1           | 2024/05/31 05:07:32.952 [worker] Time to decode output: 708ns
worker1_1           | 2024/05/31 05:07:32.952 Total execution time: 55.699209ms
worker1_1           | 2024/05/31 05:07:32.952 [worker] Executed!
worker1_1           | 2024/05/31 05:07:32.952 [worker] gonna store now
gcs_1               | 2024/05/31 05:07:32.953 WAS JUST NOTIFYOWNS()ED
worker1_1           | 2024/05/31 05:07:32.953 [worker] stored...
worker1_1           | 2024/05/31 05:07:32.953 [lobs] GOT THE RESPONSE!
worker1_1           | 2024/05/31 05:07:32.954 [lobs] CALLING COPY() ON THIS NODE...
worker1_1           | 2024/05/31 05:07:32.954 [lobs] the err was: <nil>
worker1_1           | 2024/05/31 05:07:32.954 [lobs] GETTING THE TOTAL BANDWIDTH FROM THIS...
gcs_1               | 2024/05/31 05:07:32.955 WAS JUST NOTIFYOWNS()ED
worker1_1           | 2024/05/31 05:07:32.955 elapsed time: 0.06113100051879883

output from running the below code:

@remote
def dummy_func():
    # bytearr = get(fut)

    return 10


@remote
def sleeper_func():
    time.sleep(999999)
    return ""


NUM_TRIALS = 1

start = time.time()
out = get(dummy_func.remote())
elapsed = time.time() - start

log("elapsed time:", elapsed)
exit()

with the following Go ExecuteFunction() profiling code:

func executeFunction(f []byte, args []byte, kwargs []byte) ([]byte, error) {
    startTime := time.Now()

    // Prepare the command to run the Python script
    rootPath := os.Getenv("PROJECT_ROOT")
    executeFile := filepath.Join(rootPath, "go", "cmd", "worker", "execute.py")
    cmd := exec.Command("/usr/bin/python3", executeFile)

    LocalLog("Time to prepare command: %v\n", time.Since(startTime))

    // Create a buffer to hold the serialized data
    inputBuffer := bytes.NewBuffer(nil)

    // Assume data is the serialized data you want to encode in base64
    encodeStartTime := time.Now()
    fB64 := []byte(base64.StdEncoding.EncodeToString(f))
    argsB64 := []byte(base64.StdEncoding.EncodeToString(args))
    kwargsB64 := []byte(base64.StdEncoding.EncodeToString(kwargs))

    LocalLog("Time to encode inputs: %v\n", time.Since(encodeStartTime))

    // Write the function, args, and kwargs to the buffer
    bufferWriteStartTime := time.Now()
    inputBuffer.Write(fB64)
    inputBuffer.WriteByte('\n')
    inputBuffer.Write(argsB64)
    inputBuffer.WriteByte('\n')
    inputBuffer.Write(kwargsB64)

    LocalLog("Time to write to buffer: %v\n", time.Since(bufferWriteStartTime))

    // Set the stdin to our input buffer
    cmd.Stdin = inputBuffer

    // Capture the output
    cmdExecStartTime := time.Now()
    output, err := cmd.Output()
    if err != nil {
        log.Fatalf("Error executing function: %v", err)
    }

    LocalLog("Time to execute command: %v\n", time.Since(cmdExecStartTime))

    // Decode the Base64 output to get the original pickled data
    // LocalLog("the output from this was: %s", string(output)[:min(len(string(output)), 282)])
    // LocalLog("the length of the overall string was: %v", len(string(output)))
    decodeStartTime := time.Now()
    data, err := base64.StdEncoding.DecodeString(string(output))
    if err != nil {
        log.Fatalf("Error decoding Base64: %v", err)
    }
    LocalLog("Time to decode output: %v\n", time.Since(decodeStartTime))

    totalTime := time.Since(startTime)
    log.Printf("Total execution time: %v\n", totalTime)

    // Return the output from the Python script
    return data, nil
}

Shows that Python is hella slow when you exec it from Go.

Because of this, rewriting the worker service in Python so that we can execute Python directly without any overhead from Python process startup.

Update: seems like we dropped the execution of an empty dummy function from ~60ms to ~5ms. :)

@rodrigo-castellon
Copy link
Owner Author

rodrigo-castellon commented May 31, 2024

Running into timing issues. Task requests coming into the global scheduler before it receives heartbeats from workers. Can happen if we run >10 tasks such that it starts hitting the global scheduler before the global scheduler has gotten heartbeats (this can happen if the LOBS takes a bit of time to startup and the retry is delayed).

One potential solution: add synchronization gRPC messages. There is a certain service dependency graph, and make sure we broadcast Ready() message to child services that depend on us.

Another potential solution: just use zookeeper for coordination.

Update: not worth the effort. Put on the backburner.

@rodrigo-castellon
Copy link
Owner Author

Latest commit is just last-mile stuff for replicating Fig 8a.

@rodrigo-castellon
Copy link
Owner Author

FYI: Latest changes with mmap breaks unit tests because we are no longer returning the raw bytes but rather just the size of the object (so that the getter can read from shared memory in python instead)

@rodrigo-castellon rodrigo-castellon merged commit 0be7190 into main Jun 3, 2024
0 of 2 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant