Skip to content

bug/get_job returns COMPLETED status on FAILED jobs #314

@kevinpilch

Description

@kevinpilch

Describe the bug
When implementing my own S3 to AWS RDS Postgres pipeline, I had some initial jobs fail due to wrong credentials. In the web UI, the job shows the status FAILED. When querying the job status via the unstructured_client.jobs.get_job() method, I get a JobStatus.COMPLETED status. When querying via unstructured_client.jobs.get_job_details(), I get a JobProcessingStatus.FAILED and can see the failure count in the destination node.

To Reproduce
Build and run some workflow with a Postgres destination:

def _build_s3_postgres_workflow(
        self, embeddings_create_request: EmbeddingsCreateRequest
    ) -> CreateWorkflow:
        source_response = self._configure_s3_unstructured_source(
            bucket_name=embeddings_create_request.s3_bucket,
            prefix=embeddings_create_request.s3_prefix,
        )

        destination_response = self._configure_postgres_data_destination()

        partition_node = WorkflowNode(
            name="Partitioner",
            subtype="vlm",
            type="partition",
            settings={
                "provider": "anthropic",
                "model": "claude-sonnet-4-5-20250929",
            },
        )

        chunk_node = WorkflowNode(
            name="Chunker",
            subtype="chunk_by_title",
            type="chunk",
            settings={
                "new_after_n_chars": 1000,
                "max_characters": 4096,
                "overlap": 150,
            },
        )

        embedder_node = WorkflowNode(
            name="Embedder",
            subtype="azure_openai",
            type="embed",
            settings={"model_name": "text-embedding-3-large"},
        )

        workflow = CreateWorkflow(
            name="S3 Knowledge Base to PostgreSQL Embedding Workflow",
            source_id=source_response.source_connector_information.id,
            destination_id=destination_response.destination_connector_information.id,
            workflow_type=WorkflowType.CUSTOM,
            workflow_nodes=[partition_node, chunk_node, embedder_node],
        )

        return workflow

    def _configure_s3_unstructured_source(
        self, bucket_name: str, prefix: str
    ) -> CreateSourceResponse:
        access_key_id, secret_access_key, session_token = (
            self._get_unstructured_aws_credentials()
        )

        remote_url = self._create_s3_uri(bucket_name, prefix)
        response = self.unstructured_client.sources.create_source(
            request=CreateSourceRequest(
                create_source_connector=CreateSourceConnector(
                    name="S3 Studio Knowledge Source",
                    type=SourceConnectorType.S3,
                    config={
                        "key": access_key_id,
                        "secret": secret_access_key,
                        "token": session_token,
                        "remote_url": remote_url,
                        "recursive": False,
                    },
                )
            )
        )
        return response

    def _configure_postgres_data_destination(self) -> CreateDestinationResponse:
        username, password = self._get_unstructured_postgres_credentials()
        return self.unstructured_client.destinations.create_destination(
            request=CreateDestinationRequest(
                create_destination_connector=CreateDestinationConnector(
                    name="Postgres Studio Knowledge Destination",
                    type=DestinationConnectorType.POSTGRES,
                    config={
                        "host": os.environ["POSTGRES_HOST"],
                        "database": os.environ["POSTGRES_DB"],
                        "port": os.environ["POSTGRES_PORT"],
                        "username": username,
                        "password": password,
                        "table_name": os.environ["POSTGRES_EMBEDDINGS_TABLE_NAME"],
                        "batch_size": 100,
                        "sslmode": "require",
                    },
                )
            )
        )

Misconfigure the destination password. The workflow should run and fail. Query the job status based on the job ID:

response = unstructured_client.jobs.get_job(
    request=GetJobRequest(job_id="91cf8c87-b00b-43d0-8c12-86ddf9b397b0")
)

job = response.job_information
status = job.status
print(status)

>>Output: JobStatus.COMPLETED

In the Web UI:

Image

Expected behavior
The unstructured_client should return a JobStatus.FAILED status and a non 200 status code.

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions