Skip to content

Commit

Permalink
Use "bundle sync" instead of "bundle deploy" for remote run commands (#…
Browse files Browse the repository at this point in the history
…1401)

## Changes
Also changes the continuous sync logic to use the `bundle sync` command
instead of the `sync`.

Depends on the CLI PR: databricks/cli#1853

Sync E2E tests will be failing until we update the CLI

## Tests
Manual and existing unit and e2e tests
  • Loading branch information
ilia-db authored Oct 24, 2024
1 parent 05114fa commit 7afb87f
Show file tree
Hide file tree
Showing 8 changed files with 105 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,25 @@ export class BundleRemoteStateModel extends BaseModelWithStateCache<BundleRemote
);
}

@Mutex.synchronise("mutex")
public async sync(token: CancellationToken) {
if (this.target === undefined) {
throw new Error("Target is undefined");
}
if (this.authProvider === undefined) {
throw new Error("No authentication method is set");
}

await this.cli.bundleSync(
this.target,
this.authProvider,
this.workspaceFolder,
this.workspaceConfigs.databrickscfgLocation,
this.logger,
token
);
}

public async getRunCommand(resourceKey: string) {
if (this.target === undefined) {
throw new Error("Target is undefined");
Expand Down
22 changes: 4 additions & 18 deletions packages/databricks-vscode/src/cli/CliWrapper.test.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,5 @@
import * as assert from "assert";
import {Uri} from "vscode";
import {
LocalUri,
RemoteUri,
SyncDestinationMapper,
} from "../sync/SyncDestination";
import {workspaceConfigs} from "../vscode-objs/WorkspaceConfigs";
import {promisify} from "node:util";
import {execFile as execFileCb} from "node:child_process";
Expand Down Expand Up @@ -79,25 +74,16 @@ describe(__filename, function () {
it("should create sync commands", async () => {
const logFilePath = getTempLogFilePath();
const cli = createCliWrapper(logFilePath);
const mapper = new SyncDestinationMapper(
new LocalUri(Uri.file("/user/project")),
new RemoteUri(
Uri.from({
scheme: "wsfs",
path: "/Repos/user@databricks.com/project",
})
)
);

const syncCommand = `${cliPath} sync . /Repos/user@databricks.com/project --watch --output json`;
const syncCommand = `${cliPath} bundle sync --watch --output json`;
const loggingArgs = `--log-level debug --log-file ${logFilePath} --log-format json`;
let {command, args} = cli.getSyncCommand(mapper, "incremental");
let {command, args} = cli.getSyncCommand("incremental");
assert.equal(
[command, ...args].join(" "),
[syncCommand, loggingArgs].join(" ")
);

({command, args} = cli.getSyncCommand(mapper, "full"));
({command, args} = cli.getSyncCommand("full"));
assert.equal(
[command, ...args].join(" "),
[syncCommand, loggingArgs, "--full"].join(" ")
Expand All @@ -106,7 +92,7 @@ describe(__filename, function () {
const configsSpy = spy(workspaceConfigs);
mocks.push(configsSpy);
when(configsSpy.loggingEnabled).thenReturn(false);
({command, args} = cli.getSyncCommand(mapper, "incremental"));
({command, args} = cli.getSyncCommand("incremental"));
assert.equal([command, ...args].join(" "), syncCommand);
});

Expand Down
35 changes: 28 additions & 7 deletions packages/databricks-vscode/src/cli/CliWrapper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import {
commands,
CancellationToken,
} from "vscode";
import {SyncDestinationMapper} from "../sync/SyncDestination";
import {workspaceConfigs} from "../vscode-objs/WorkspaceConfigs";
import {promisify} from "node:util";
import {logging} from "@databricks/databricks-sdk";
Expand Down Expand Up @@ -335,14 +334,10 @@ export class CliWrapper {
/**
* Constructs the databricks sync command
*/
getSyncCommand(
syncDestination: SyncDestinationMapper,
syncType: SyncType
): Command {
getSyncCommand(syncType: SyncType): Command {
const args = [
"bundle",
"sync",
".",
syncDestination.remoteUri.path,
"--watch",
"--output",
"json",
Expand Down Expand Up @@ -664,6 +659,32 @@ export class CliWrapper {
);
}

async bundleSync(
target: string,
authProvider: AuthProvider,
workspaceFolder: Uri,
configfilePath?: string,
logger?: logging.NamedLogger,
token?: CancellationToken
) {
await commands.executeCommand("databricks.bundle.showLogs");
return await runBundleCommand(
"sync",
this.cliPath,
["bundle", "sync", "--target", target, "--output", "text"],
workspaceFolder,
{
start: `Uploading bundle assets for target ${target}...`,
end: "Bundle assets uploaded successfully.",
error: "Failed to upload bundle assets.",
},
await this.getBundleCommandEnvVars(authProvider, configfilePath),
logger,
{},
token
);
}

async getBundleRunCommand(
target: string,
authProvider: AuthProvider,
Expand Down
17 changes: 4 additions & 13 deletions packages/databricks-vscode/src/cli/SyncTasks.ts
Original file line number Diff line number Diff line change
Expand Up @@ -249,12 +249,12 @@ export class LazyCustomSyncTerminal extends CustomSyncTerminal {
Object.defineProperties(this, {
cmd: {
get: () => {
return this.getSyncCommand(ctx).command;
return this.getSyncCommand().command;
},
},
args: {
get: () => {
return this.getSyncCommand(ctx).args;
return this.getSyncCommand().args;
},
},
options: {
Expand Down Expand Up @@ -311,21 +311,12 @@ export class LazyCustomSyncTerminal extends CustomSyncTerminal {
} as SpawnOptions;
}

@withLogContext(Loggers.Extension)
getSyncCommand(@context ctx?: Context): Command {
getSyncCommand(): Command {
if (this.command) {
return this.command;
}
const syncDestination = this.connection.syncDestinationMapper;

if (!syncDestination) {
throw this.showErrorAndKillThis(
"Can't start sync: Databricks synchronization destination not configured!",
ctx
);
}

this.command = this.cli.getSyncCommand(syncDestination, this.syncType);
this.command = this.cli.getSyncCommand(this.syncType);

return this.command;
}
Expand Down
4 changes: 2 additions & 2 deletions packages/databricks-vscode/src/run/DatabricksRuntime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -137,9 +137,9 @@ export class DatabricksRuntime implements Disposable {
throw new Error("No sync destination found");
}

log("Deploying assets to databricks workspace...");
log("Uploading assets to databricks workspace...");
this.state = "SYNCING";
await this.bundleCommands.deploy();
await this.bundleCommands.sync();

await commands.executeCommand("workbench.panel.repl.view.focus");

Expand Down
4 changes: 2 additions & 2 deletions packages/databricks-vscode/src/run/WorkflowRunner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -89,11 +89,11 @@ export class WorkflowRunner implements Disposable {
}

try {
await this.bundleCommands.deploy();
await this.bundleCommands.sync();
} catch (e: unknown) {
if (e instanceof Error) {
panel.showError({
message: `Can't deploy assets to databricks workspace. \nReason: ${e.message}`,
message: `Can't upload assets to databricks workspace. \nReason: ${e.message}`,
});
}
return;
Expand Down
41 changes: 19 additions & 22 deletions packages/databricks-vscode/src/test/e2e/run_files.e2e.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,33 +46,14 @@ describe("Run files", async function () {
await openFile("hello.py");
});

it("should run a python file on a cluster", async () => {
const workbench = await driver.getWorkbench();
await executeCommandWhenAvailable("Databricks: Upload and Run File");

const debugOutput = await workbench
.getBottomBar()
.openDebugConsoleView();

while (true) {
await sleep(2000);
const text = await (await debugOutput.elem).getHTML();
if (text && text.includes("hello world")) {
break;
}
}
});

it("should cancel a run during deployment", async () => {
const workbench = await driver.getWorkbench();
await executeCommandWhenAvailable("Databricks: Upload and Run File");
await browser.waitUntil(async () => {
const notifications = await workbench.getNotifications();
for (const notification of notifications) {
const message = await notification.getMessage();
if (message.includes("Deploying")) {
// Make sure the CLI is actually spawned before cancelling
await sleep(1000);
if (message.includes("Uploading bundle assets")) {
await notification.takeAction("Cancel");
return true;
}
Expand All @@ -92,9 +73,25 @@ describe("Run files", async function () {
}
});

it("should run a python file as a workflow", async () => {
it("should run a python file on a cluster", async () => {
const workbench = await driver.getWorkbench();
await workbench.executeQuickPick("Databricks: Run File as Workflow");
await executeCommandWhenAvailable("Databricks: Upload and Run File");

const debugOutput = await workbench
.getBottomBar()
.openDebugConsoleView();

while (true) {
await sleep(2000);
const text = await (await debugOutput.elem).getHTML();
if (text && text.includes("hello world")) {
break;
}
}
});

it("should run a python file as a workflow", async () => {
await executeCommandWhenAvailable("Databricks: Run File as Workflow");
await waitForWorkflowWebview("hello world");
});
});
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,33 @@ export class BundleCommands implements Disposable {

private deployMutex = new Mutex();

@Mutex.synchronise("deployMutex")
async sync() {
try {
this.whenContext.setDeploymentState("deploying");
await window.withProgress(
{
location: ProgressLocation.Notification,
title: "Uploading bundle assets",
cancellable: true,
},
async (progress, token) => {
await this.bundleRemoteStateModel.sync(token);
}
);
} catch (e) {
if (!(e instanceof Error)) {
throw e;
}
if (e instanceof ProcessError) {
e.showErrorMessage("Error synchronising bundle assets.");
}
throw e;
} finally {
this.whenContext.setDeploymentState("idle");
}
}

@Mutex.synchronise("deployMutex")
async deploy(force = false) {
try {
Expand Down

0 comments on commit 7afb87f

Please sign in to comment.