diff --git a/build.sbt b/build.sbt index 3767e7a6c21..27e02e1deeb 100644 --- a/build.sbt +++ b/build.sbt @@ -167,6 +167,16 @@ lazy val WorkflowExecutionService = (project in file("amber")) ) .configs(Test) .dependsOn(DAO % "test->test", Auth % "test->test") // test scope dependency +lazy val NotebookMigrationService = (project in file("notebook-migration-service")) + .dependsOn(Auth, Config, DAO) + .settings(asfLicensingSettings) + .settings( + dependencyOverrides ++= Seq( + // override it as io.dropwizard 4 require 2.16.1 or higher + "com.fasterxml.jackson.module" %% "jackson-module-scala" % "2.17.0" + ) + ) + .dependsOn(DAO % "test->test") // test scope dependency // root project definition lazy val TexeraProject = (project in file(".")) @@ -184,7 +194,8 @@ lazy val TexeraProject = (project in file(".")) ConfigService, FileService, WorkflowCompilingService, - WorkflowExecutionService + WorkflowExecutionService, + NotebookMigrationService ) .settings( name := "texera", diff --git a/common/config/src/main/resources/storage.conf b/common/config/src/main/resources/storage.conf index 0682109f19e..4fdcf0c1694 100644 --- a/common/config/src/main/resources/storage.conf +++ b/common/config/src/main/resources/storage.conf @@ -146,4 +146,11 @@ storage { password = "postgres" password = ${?STORAGE_JDBC_PASSWORD} } + + # Configurations of the JupyterLab service + # Default values are provided for each field, which you don't need to change if you deployed Jupyter via docker-compose.yml in notebook-migration-service/src/main/resources/docker-compose.yml + jupyter { + url = "http://localhost:9100" + url = ${?STORAGE_JUPYTER_URL} + } } diff --git a/common/config/src/main/scala/org/apache/texera/amber/config/StorageConfig.scala b/common/config/src/main/scala/org/apache/texera/amber/config/StorageConfig.scala index 07447cfdbee..bbbc9ffbaee 100644 --- a/common/config/src/main/scala/org/apache/texera/amber/config/StorageConfig.scala +++ b/common/config/src/main/scala/org/apache/texera/amber/config/StorageConfig.scala @@ -132,4 +132,7 @@ object StorageConfig { val ENV_S3_REGION = "STORAGE_S3_REGION" val ENV_S3_AUTH_USERNAME = "STORAGE_S3_AUTH_USERNAME" val ENV_S3_AUTH_PASSWORD = "STORAGE_S3_AUTH_PASSWORD" + + // Jupyter + val jupyterURL: String = conf.getString("storage.jupyter.url") } diff --git a/frontend/proxy.config.json b/frontend/proxy.config.json index f68602e0714..7d68961d9b6 100755 --- a/frontend/proxy.config.json +++ b/frontend/proxy.config.json @@ -10,6 +10,11 @@ "secure": false, "changeOrigin": true }, + "/api/notebook-migration": { + "target": "http://localhost:9098", + "secure": false, + "changeOrigin": true + }, "/api/models": { "target": "http://localhost:9096", "secure": false, diff --git a/notebook-migration-service/build.sbt b/notebook-migration-service/build.sbt new file mode 100644 index 00000000000..f744653c63c --- /dev/null +++ b/notebook-migration-service/build.sbt @@ -0,0 +1,77 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import scala.collection.Seq + +name := "notebook-migration-service" + + +enablePlugins(JavaAppPackaging) + +// Enable semanticdb for Scalafix +ThisBuild / semanticdbEnabled := true +ThisBuild / semanticdbVersion := scalafixSemanticdb.revision + +// Manage dependency conflicts by always using the latest revision +ThisBuild / conflictManager := ConflictManager.latestRevision + +// Restrict parallel execution of tests to avoid conflicts +Global / concurrentRestrictions += Tags.limit(Tags.Test, 1) + +///////////////////////////////////////////////////////////////////////////// +// Compiler Options +///////////////////////////////////////////////////////////////////////////// + +// Scala compiler options +Compile / scalacOptions ++= Seq( + "-Xelide-below", "WARNING", // Turn on optimizations with "WARNING" as the threshold + "-feature", // Check feature warnings + "-deprecation", // Check deprecation warnings + "-Ywarn-unused:imports" // Check for unused imports +) + +///////////////////////////////////////////////////////////////////////////// +// Version Variables +///////////////////////////////////////////////////////////////////////////// + +val dropwizardVersion = "4.0.7" +val mockitoVersion = "5.4.0" +val assertjVersion = "3.24.2" + +///////////////////////////////////////////////////////////////////////////// +// Test-related Dependencies +///////////////////////////////////////////////////////////////////////////// + +libraryDependencies ++= Seq( + "org.scalamock" %% "scalamock" % "5.2.0" % Test, // ScalaMock + "org.scalatest" %% "scalatest" % "3.2.17" % Test, // ScalaTest + "io.dropwizard" % "dropwizard-testing" % dropwizardVersion % Test, // Dropwizard Testing + "org.mockito" % "mockito-core" % mockitoVersion % Test, // Mockito for mocking + "org.assertj" % "assertj-core" % assertjVersion % Test, // AssertJ for assertions + "com.novocode" % "junit-interface" % "0.11" % Test // SBT interface for JUnit +) + +///////////////////////////////////////////////////////////////////////////// +// Dependencies +///////////////////////////////////////////////////////////////////////////// + +// Core Dependencies +libraryDependencies ++= Seq( + "io.dropwizard" % "dropwizard-core" % dropwizardVersion, + "io.dropwizard" % "dropwizard-auth" % dropwizardVersion, // Dropwizard Authentication module + "com.fasterxml.jackson.module" %% "jackson-module-scala" % "2.18.6" +) \ No newline at end of file diff --git a/notebook-migration-service/project/build.properties b/notebook-migration-service/project/build.properties new file mode 100644 index 00000000000..d0ba9f1421a --- /dev/null +++ b/notebook-migration-service/project/build.properties @@ -0,0 +1,18 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +sbt.version = 1.12.9 \ No newline at end of file diff --git a/notebook-migration-service/src/main/resources/logback.xml b/notebook-migration-service/src/main/resources/logback.xml new file mode 100644 index 00000000000..ffcc406a752 --- /dev/null +++ b/notebook-migration-service/src/main/resources/logback.xml @@ -0,0 +1,55 @@ + + + + + + + [%date{ISO8601}] [%level] [%logger] [%thread] - %msg %n + + + + + + + logs/notebook-migration-service.log + true + + logs/notebook-migration-service-%d{yyyy-MM-dd}.log.gz + + + [%date{ISO8601}] [%level] [%logger] [%thread] - %msg %n + + + + + 8192 + true + + + + + + + + + + + \ No newline at end of file diff --git a/notebook-migration-service/src/main/resources/notebook-migration-service-web-config.yaml b/notebook-migration-service/src/main/resources/notebook-migration-service-web-config.yaml new file mode 100644 index 00000000000..947dd59c4f5 --- /dev/null +++ b/notebook-migration-service/src/main/resources/notebook-migration-service-web-config.yaml @@ -0,0 +1,36 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +server: + applicationConnectors: + - type: http + port: 9098 + adminConnectors: [] + requestLog: + type: classic + appenders: [] + +logging: + level: ${TEXERA_SERVICE_LOG_LEVEL:-INFO} + appenders: + - type: console + threshold: ${TEXERA_SERVICE_LOG_LEVEL:-INFO} + - type: file + currentLogFilename: logs/notebook-migration-service.log + archive: true + archivedLogFilenamePattern: logs/notebook-migration-service-%d.log.gz + archivedFileCount: 5 \ No newline at end of file diff --git a/notebook-migration-service/src/main/scala/org/apache/texera/service/NotebookMigrationService.scala b/notebook-migration-service/src/main/scala/org/apache/texera/service/NotebookMigrationService.scala new file mode 100644 index 00000000000..9effd07436e --- /dev/null +++ b/notebook-migration-service/src/main/scala/org/apache/texera/service/NotebookMigrationService.scala @@ -0,0 +1,80 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.texera.service + +import com.fasterxml.jackson.module.scala.DefaultScalaModule +import com.typesafe.scalalogging.LazyLogging +import io.dropwizard.configuration.{EnvironmentVariableSubstitutor, SubstitutingSourceProvider} +import io.dropwizard.core.Application +import io.dropwizard.core.setup.{Bootstrap, Environment} +import org.apache.texera.amber.config.StorageConfig +import org.apache.texera.auth.RequestLoggingFilter +import org.apache.texera.dao.SqlServer +import java.nio.file.Path +import org.apache.texera.service.resource.NotebookMigrationResource + +class NotebookMigrationService + extends Application[NotebookMigrationServiceConfiguration] + with LazyLogging { + override def initialize(bootstrap: Bootstrap[NotebookMigrationServiceConfiguration]): Unit = { + // enable environment variable substitution in YAML config + bootstrap.setConfigurationSourceProvider( + new SubstitutingSourceProvider( + bootstrap.getConfigurationSourceProvider, + new EnvironmentVariableSubstitutor(false) + ) + ) + // Register Scala module to Dropwizard default object mapper + bootstrap.getObjectMapper.registerModule(DefaultScalaModule) + + SqlServer.initConnection( + StorageConfig.jdbcUrl, + StorageConfig.jdbcUsername, + StorageConfig.jdbcPassword + ) + } + + override def run( + configuration: NotebookMigrationServiceConfiguration, + environment: Environment + ): Unit = { + // Serve backend at /api + environment.jersey.setUrlPattern("/api/*") + + environment.jersey.register(classOf[NotebookMigrationResource]) + + // Route request logs through SLF4J, controlled by TEXERA_SERVICE_LOG_LEVEL + RequestLoggingFilter.register(environment.getApplicationContext) + } +} +object NotebookMigrationService { + def main(args: Array[String]): Unit = { + val notebookMigrationPath = Path + .of(sys.env.getOrElse("TEXERA_HOME", ".")) + .resolve("notebook-migration-service") + .resolve("src") + .resolve("main") + .resolve("resources") + .resolve("notebook-migration-service-web-config.yaml") + .toAbsolutePath + .toString + + // Start the Dropwizard application + new NotebookMigrationService().run("server", notebookMigrationPath) + } +} diff --git a/notebook-migration-service/src/main/scala/org/apache/texera/service/NotebookMigrationServiceConfiguration.scala b/notebook-migration-service/src/main/scala/org/apache/texera/service/NotebookMigrationServiceConfiguration.scala new file mode 100644 index 00000000000..dc908e7565e --- /dev/null +++ b/notebook-migration-service/src/main/scala/org/apache/texera/service/NotebookMigrationServiceConfiguration.scala @@ -0,0 +1,22 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.texera.service + +import io.dropwizard.core.Configuration + +class NotebookMigrationServiceConfiguration extends Configuration {} diff --git a/notebook-migration-service/src/main/scala/org/apache/texera/service/resource/NotebookMigrationResource.scala b/notebook-migration-service/src/main/scala/org/apache/texera/service/resource/NotebookMigrationResource.scala new file mode 100644 index 00000000000..6b06d883619 --- /dev/null +++ b/notebook-migration-service/src/main/scala/org/apache/texera/service/resource/NotebookMigrationResource.scala @@ -0,0 +1,358 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.texera.service.resource + +import com.fasterxml.jackson.databind.ObjectMapper +import com.fasterxml.jackson.module.scala.DefaultScalaModule +import com.typesafe.scalalogging.LazyLogging +import jakarta.ws.rs._ +import jakarta.ws.rs.core._ +import org.apache.texera.dao.SqlServer +import org.jooq.JSONB +import org.apache.texera.dao.jooq.generated.tables.Notebook +import org.apache.texera.dao.jooq.generated.tables.WorkflowNotebookMapping +import java.net.{HttpURLConnection, URL} +import java.nio.charset.StandardCharsets +import scala.util.control.NonFatal +import org.apache.texera.amber.config.StorageConfig + +object NotebookMigrationResource extends LazyLogging { + + private val mapper: ObjectMapper = new ObjectMapper().registerModule(DefaultScalaModule) + private val jupyterUrl = StorageConfig.jupyterURL + private var jupyterIframeURL = s"$jupyterUrl/notebooks/work/notebook.ipynb" + + private def isJupyterAvailable(jupyterUrl: String): Boolean = { + try { + val conn = new java.net.URL(s"$jupyterUrl/api") + .openConnection() + .asInstanceOf[java.net.HttpURLConnection] + + conn.setRequestMethod("GET") + conn.setConnectTimeout(2000) + conn.setReadTimeout(2000) + + val status = conn.getResponseCode + + status == 200 || status == 403 + } catch { + case _: Exception => false + } + } + + // Returns the Jupyter iframe reference URL + def getJupyterIframeURL(): Response = { + if (!isJupyterAvailable(jupyterUrl)) { + return Response + .status(500) + .entity( + """ + { + "success": false, + "message": "Cannot connect to Jupyter server" + } + """ + ) + .build() + } + + Response + .ok( + s""" + { + "success": true, + "url": "$jupyterIframeURL" + } + """ + ) + .build() + } + + // Returns the URL of Jupyter + def getJupyterURL(): Response = { + if (!isJupyterAvailable(jupyterUrl)) { + return Response + .status(500) + .entity( + """ + { + "success": false, + "message": "Cannot connect to Jupyter server" + } + """ + ) + .build() + } + + Response + .ok( + s""" + { + "success": true, + "url": "$jupyterUrl" + } + """ + ) + .build() + } + + // Set the notebook in Jupyter + def setNotebook(body: String): Response = { + if (!isJupyterAvailable(jupyterUrl)) { + return Response + .status(500) + .entity( + """ + { + "success": false, + "message": "Cannot connect to Jupyter server" + } + """ + ) + .build() + } + + try { + val json = mapper.readTree(body) + + val notebookName = json.get("notebookName").asText() + val notebookData = json.get("notebookData") + + // Construct Jupyter API URL + val apiUrl = s"$jupyterUrl/api/contents/work/$notebookName" + + val url = new URL(apiUrl) + val conn = url.openConnection().asInstanceOf[HttpURLConnection] + + conn.setRequestMethod("PUT") + conn.setDoOutput(true) + conn.setRequestProperty("Content-Type", "application/json") + + val requestBody = + s""" + { + "type": "notebook", + "content": $notebookData + } + """ + + val os = conn.getOutputStream + os.write(requestBody.getBytes(StandardCharsets.UTF_8)) + os.flush() + os.close() + + val status = conn.getResponseCode + + if (status != 200 && status != 201) { + return Response + .status(500) + .entity( + s""" + { + "success": false, + "message": "Failed to upload notebook to Jupyter (status $status)" + } + """ + ) + .build() + } + + jupyterIframeURL = s"$jupyterUrl/notebooks/work/notebook.ipynb" + + Response + .ok( + s""" + { + "success": true, + "message": "Notebook successfully sent to Jupyter." + } + """ + ) + .build() + + } catch { + case NonFatal(e) => + logger.error("Error sending notebook to Jupyter", e) + Response + .status(Response.Status.INTERNAL_SERVER_ERROR) + .entity(s"""{"error":"${e.getMessage}"}""") + .build() + } + } + + // Store notebook + mapping in database + def storeNotebookAndMapping(body: String): Response = { + try { + val json = mapper.readTree(body) + + val wid: java.lang.Integer = json.get("wid").asInt() + val vid: java.lang.Integer = json.get("vid").asInt() + val mappingNode = json.get("mapping") + val notebookNode = json.get("notebook") + + val dsl = SqlServer.getInstance().createDSLContext() + + val nid: java.lang.Integer = SqlServer.withTransaction(dsl) { ctx => + // Insert notebook + val notebookRecord = ctx + .insertInto(Notebook.NOTEBOOK) + .set(Notebook.NOTEBOOK.WID, wid) + .set(Notebook.NOTEBOOK.NOTEBOOK_, JSONB.valueOf(notebookNode.toString)) + .returning(Notebook.NOTEBOOK.NID) + .fetchOne() + + val nidInside: java.lang.Integer = notebookRecord.getValue(Notebook.NOTEBOOK.NID) + + // Insert workflow-notebook mapping + ctx + .insertInto(WorkflowNotebookMapping.WORKFLOW_NOTEBOOK_MAPPING) + .set(WorkflowNotebookMapping.WORKFLOW_NOTEBOOK_MAPPING.WID, wid) + .set(WorkflowNotebookMapping.WORKFLOW_NOTEBOOK_MAPPING.VID, vid) + .set(WorkflowNotebookMapping.WORKFLOW_NOTEBOOK_MAPPING.NID, nidInside) + .set( + WorkflowNotebookMapping.WORKFLOW_NOTEBOOK_MAPPING.MAPPING, + JSONB.valueOf(mappingNode.toString) + ) + .execute() + + nidInside + } + + Response + .ok( + s""" + { + "success": true, + "message": "Notebook and mapping successfully stored. wid: $wid, vid: $vid, nid: $nid" + } + """ + ) + .build() + + } catch { + case NonFatal(e) => + logger.error("Error storing mapping and workflow", e) + Response + .status(Response.Status.INTERNAL_SERVER_ERROR) + .entity(s"""{"error":"${e.getMessage}"}""") + .build() + } + } + + // Fetch notebook + mapping + def fetchNotebookAndMapping(body: String): Response = { + try { + val json = mapper.readTree(body) + + val wid: java.lang.Integer = json.get("wid").asInt() + val vid: java.lang.Integer = json.get("vid").asInt() + + val dsl = SqlServer.getInstance().createDSLContext() + + // Fetch the most recent notebook (highest nid) for this workflow version + val result = dsl + .select( + Notebook.NOTEBOOK.NID, + Notebook.NOTEBOOK.NOTEBOOK_, + WorkflowNotebookMapping.WORKFLOW_NOTEBOOK_MAPPING.MAPPING + ) + .from(Notebook.NOTEBOOK) + .join(WorkflowNotebookMapping.WORKFLOW_NOTEBOOK_MAPPING) + .on(Notebook.NOTEBOOK.WID.eq(WorkflowNotebookMapping.WORKFLOW_NOTEBOOK_MAPPING.WID)) + .and(Notebook.NOTEBOOK.NID.eq(WorkflowNotebookMapping.WORKFLOW_NOTEBOOK_MAPPING.NID)) + .where(Notebook.NOTEBOOK.WID.eq(wid)) + .and(WorkflowNotebookMapping.WORKFLOW_NOTEBOOK_MAPPING.VID.eq(vid)) + .orderBy(Notebook.NOTEBOOK.NID.desc()) // most recent nid first + .limit(1) // only take the latest + .fetchOne() + + if (result == null) { + Response.ok("""{"exists": false}""").build() + } else { + val nid: Int = result.getValue(Notebook.NOTEBOOK.NID) + val notebookJson: String = + result.get(Notebook.NOTEBOOK.NOTEBOOK_).asInstanceOf[JSONB].data() + val mappingJson: String = result + .get(WorkflowNotebookMapping.WORKFLOW_NOTEBOOK_MAPPING.MAPPING) + .asInstanceOf[JSONB] + .data() + + Response + .ok( + s""" + { + "exists": true, + "notebook": $notebookJson, + "mapping": $mappingJson + } + """ + ) + .build() + } + + } catch { + case NonFatal(e) => + logger.error("Database error retrieving mapping", e) + Response + .status(Response.Status.INTERNAL_SERVER_ERROR) + .entity(s"""{"error":"${e.getMessage}"}""") + .build() + } + } +} + +@Path("/notebook-migration") +@Produces(Array(MediaType.APPLICATION_JSON)) +@Consumes(Array(MediaType.APPLICATION_JSON)) +class NotebookMigrationResource extends LazyLogging { + + @GET + @Path("/get-jupyter-iframe-url") + def getJupyterIframeURL: Response = { + logger.info("Getting Jupyter iframe URL") + NotebookMigrationResource.getJupyterIframeURL() + } + + @GET + @Path("/get-jupyter-url") + def getJupyterURL: Response = { + logger.info("Getting Jupyter API URL") + NotebookMigrationResource.getJupyterURL() + } + + @POST + @Path("/set-notebook") + def setNotebook(body: String): Response = { + logger.info("Setting notebook, request body: " + body) + NotebookMigrationResource.setNotebook(body) + } + + @POST + @Path("/store-notebook-and-mapping") + def storeNotebookAndMapping(body: String): Response = { + logger.info("Storing notebook and mapping, request body: " + body) + NotebookMigrationResource.storeNotebookAndMapping(body) + } + + @POST + @Path("/fetch-notebook-and-mapping") + def fetchNotebookAndMapping(body: String): Response = { + logger.info("Fetching notebook and mapping, request body: " + body) + NotebookMigrationResource.fetchNotebookAndMapping(body) + } +} diff --git a/notebook-migration-service/src/test/scala/org/apache/texera/service/resource/NotebookMigrationResourceSpec.scala b/notebook-migration-service/src/test/scala/org/apache/texera/service/resource/NotebookMigrationResourceSpec.scala new file mode 100644 index 00000000000..61544bf65d2 --- /dev/null +++ b/notebook-migration-service/src/test/scala/org/apache/texera/service/resource/NotebookMigrationResourceSpec.scala @@ -0,0 +1,204 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.service.resource + +import jakarta.ws.rs.core.Response +import org.apache.texera.dao.MockTexeraDB +import org.apache.texera.dao.jooq.generated.tables.Notebook.NOTEBOOK +import org.apache.texera.dao.jooq.generated.tables.Workflow.WORKFLOW +import org.apache.texera.dao.jooq.generated.tables.WorkflowNotebookMapping.WORKFLOW_NOTEBOOK_MAPPING +import org.apache.texera.dao.jooq.generated.tables.WorkflowVersion.WORKFLOW_VERSION +import org.apache.texera.dao.jooq.generated.tables.daos.{WorkflowDao, WorkflowVersionDao} +import org.apache.texera.dao.jooq.generated.tables.pojos.{Workflow, WorkflowVersion} +import org.jooq.JSONB +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers +import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach} + +import java.sql.Timestamp +import java.util.UUID + +class NotebookMigrationResourceSpec + extends AnyFlatSpec + with Matchers + with BeforeAndAfterAll + with BeforeAndAfterEach + with MockTexeraDB { + + // Randomise the seeded wid so a parallel run of unrelated specs that happen + // to seed the same id wouldn't collide on the embedded postgres. + private val testWid = 9000 + scala.util.Random.nextInt(1000) + + private var workflowDao: WorkflowDao = _ + private var workflowVersionDao: WorkflowVersionDao = _ + private var seededVid: Integer = _ + + private val sampleNotebook = + """{"cells":[{"cell_type":"code","metadata":{},"source":"print(1)"}]}""" + private val sampleMapping = + """{"operator_to_cell":{},"cell_to_operator":{}}""" + + override protected def beforeAll(): Unit = initializeDBAndReplaceDSLContext() + override protected def afterAll(): Unit = shutdownDB() + + override protected def beforeEach(): Unit = { + val cfg = getDSLContext.configuration() + workflowDao = new WorkflowDao(cfg) + workflowVersionDao = new WorkflowVersionDao(cfg) + cleanup() + + val workflow = new Workflow + workflow.setWid(testWid) + workflow.setName(s"wf_${UUID.randomUUID().toString.substring(0, 8)}") + workflow.setContent("{}") + workflow.setDescription("") + workflow.setCreationTime(new Timestamp(System.currentTimeMillis())) + workflow.setLastModifiedTime(new Timestamp(System.currentTimeMillis())) + workflowDao.insert(workflow) + + val version = new WorkflowVersion + version.setWid(testWid) + version.setContent("{}") + version.setCreationTime(new Timestamp(System.currentTimeMillis())) + workflowVersionDao.insert(version) + seededVid = version.getVid + } + + override protected def afterEach(): Unit = cleanup() + + private def cleanup(): Unit = { + // notebook and workflow_notebook_mapping cascade on workflow/version delete, + // but explicit deletes here keep state observable across tests and avoid + // depending on cascade ordering. + getDSLContext.deleteFrom(WORKFLOW_NOTEBOOK_MAPPING).execute() + getDSLContext.deleteFrom(NOTEBOOK).execute() + getDSLContext + .deleteFrom(WORKFLOW_VERSION) + .where(WORKFLOW_VERSION.WID.eq(testWid)) + .execute() + getDSLContext.deleteFrom(WORKFLOW).where(WORKFLOW.WID.eq(testWid)).execute() + } + + private def storePayload( + notebook: String = sampleNotebook, + mapping: String = sampleMapping, + vid: Integer = seededVid + ): String = + s"""{"wid": $testWid, "vid": $vid, "notebook": $notebook, "mapping": $mapping}""" + + private def fetchPayload(vid: Integer = seededVid): String = + s"""{"wid": $testWid, "vid": $vid}""" + + // -- storeNotebookAndMapping ------------------------------------------------ + + "storeNotebookAndMapping" should "insert one notebook and one mapping tied to the workflow version" in { + val response = NotebookMigrationResource.storeNotebookAndMapping(storePayload()) + response.getStatus shouldBe Response.Status.OK.getStatusCode + + getDSLContext.fetchCount(NOTEBOOK) shouldBe 1 + getDSLContext.fetchCount(WORKFLOW_NOTEBOOK_MAPPING) shouldBe 1 + + val notebookRow = getDSLContext.selectFrom(NOTEBOOK).fetchOne() + notebookRow.get(NOTEBOOK.WID) shouldBe testWid + + val mappingRow = getDSLContext.selectFrom(WORKFLOW_NOTEBOOK_MAPPING).fetchOne() + mappingRow.get(WORKFLOW_NOTEBOOK_MAPPING.WID) shouldBe testWid + mappingRow.get(WORKFLOW_NOTEBOOK_MAPPING.VID) shouldBe seededVid + // The mapping row must reference the just-inserted notebook by its returned nid. + mappingRow.get(WORKFLOW_NOTEBOOK_MAPPING.NID) shouldBe notebookRow.get(NOTEBOOK.NID) + } + + it should "round-trip notebook and mapping JSON content through the JSONB columns" in { + val notebook = + """{"cells":[{"cell_type":"code","metadata":{"uuid":"abc-123"},"source":"x = 1"}]}""" + val mapping = + """{"operator_to_cell":{"op1":["cell1"]},"cell_to_operator":{"cell1":["op1"]}}""" + + NotebookMigrationResource.storeNotebookAndMapping(storePayload(notebook, mapping)) + + val storedNotebookJson = + getDSLContext.selectFrom(NOTEBOOK).fetchOne().get(NOTEBOOK.NOTEBOOK_).asInstanceOf[JSONB].data() + val storedMappingJson = + getDSLContext + .selectFrom(WORKFLOW_NOTEBOOK_MAPPING) + .fetchOne() + .get(WORKFLOW_NOTEBOOK_MAPPING.MAPPING) + .asInstanceOf[JSONB] + .data() + + // Use whitespace-agnostic substring checks — postgres canonicalises JSONB + // text on the way out, so exact-string compare against the input would be + // fragile across postgres versions. + storedNotebookJson should include("\"abc-123\"") + storedNotebookJson should include("x = 1") + storedMappingJson should include("\"op1\"") + storedMappingJson should include("\"cell1\"") + } + + it should "roll back the notebook insert when the mapping insert fails its FK constraint" in { + // workflow_notebook_mapping.vid has FK -> workflow_version(vid). Passing an + // unknown vid trips the mapping insert; because both inserts share a single + // SqlServer.withTransaction block, the notebook insert must roll back too. + // Without this guarantee, orphaned notebook rows would accumulate on every + // failed store. + val unknownVid: Integer = -1 + val response = NotebookMigrationResource.storeNotebookAndMapping( + storePayload(vid = unknownVid) + ) + response.getStatus shouldBe Response.Status.INTERNAL_SERVER_ERROR.getStatusCode + getDSLContext.fetchCount(NOTEBOOK) shouldBe 0 + getDSLContext.fetchCount(WORKFLOW_NOTEBOOK_MAPPING) shouldBe 0 + } + + // -- fetchNotebookAndMapping ------------------------------------------------ + + "fetchNotebookAndMapping" should "return exists=false when no notebook is stored for the (wid, vid)" in { + val response = NotebookMigrationResource.fetchNotebookAndMapping(fetchPayload()) + response.getStatus shouldBe Response.Status.OK.getStatusCode + response.getEntity.toString should include("\"exists\": false") + } + + it should "return exists=true with the stored notebook and mapping when a row exists" in { + NotebookMigrationResource.storeNotebookAndMapping(storePayload()) + + val entity = NotebookMigrationResource.fetchNotebookAndMapping(fetchPayload()).getEntity.toString + entity should include("\"exists\": true") + entity should include("\"notebook\":") + entity should include("\"mapping\":") + } + + it should "return the most recent notebook (highest nid) when multiple are stored for the same (wid, vid)" in { + // The endpoint uses orderBy(NID.desc).limit(1) to surface the latest + // notebook. Pin that ordering — a refactor that drops the limit or flips + // to ASC would silently regress workflow-reopen UX (the panel would + // restore from a stale notebook). + val first = + """{"cells":[{"cell_type":"code","metadata":{},"source":"v1"}]}""" + val second = + """{"cells":[{"cell_type":"code","metadata":{},"source":"v2"}]}""" + + NotebookMigrationResource.storeNotebookAndMapping(storePayload(first, sampleMapping)) + NotebookMigrationResource.storeNotebookAndMapping(storePayload(second, sampleMapping)) + + val entity = NotebookMigrationResource.fetchNotebookAndMapping(fetchPayload()).getEntity.toString + entity should include("\"v2\"") + entity should not include "\"v1\"" + } +}