Skip to content

Ingestion Pipeline

An IngestionPipeline uses a concept of Transformations that are applied to input data. These Transformations are applied to your input data, and the resulting nodes are either returned or inserted into a vector database (if given).

Terminal window
npm add @vectorstores/core @vectorstores/qdrant openai

The simplest usage is to instantiate an IngestionPipeline like so:

import fs from "node:fs/promises";
import {
BaseEmbedding,
Document,
IngestionPipeline,
MetadataMode,
SentenceSplitter,
Settings,
VectorStoreIndex,
} from "@vectorstores/core";
import { OpenAI } from "openai";
async function main() {
// Configure embeddings function for BaseEmbedding()
const openai = new OpenAI();
Settings.embedFunc = async (input: string[]): Promise<number[][]> => {
const { data } = await openai.embeddings.create({
model: "text-embedding-3-small",
input,
});
return data.map((d) => d.embedding);
};
// Load essay from abramov.txt in Node
const filePath = fileURLToPath(
new URL("./data/abramov.txt", import.meta.url),
);
const essay = await fs.readFile(filePath, "utf-8");
// Create Document object with essay
const document = new Document({ text: essay, id_: filePath });
const pipeline = new IngestionPipeline({
transformations: [
new SentenceSplitter({ chunkSize: 1024, chunkOverlap: 20 }),
new BaseEmbedding(),
],
});
console.time("Pipeline Run Time");
// Run the pipeline
const nodes = await pipeline.run({ documents: [document] });
console.timeEnd("Pipeline Run Time");
// Initialize the VectorStoreIndex from nodes
const index = await VectorStoreIndex.init({ nodes });
// Retrieve from the index
const retriever = index.asRetriever();
const response = await retriever.retrieve({
query: "What did the author do in college?",
});
// Output response
for (const result of response) {
console.log("Score:", result.score ?? "-");
console.log(result.node.getContent(MetadataMode.NONE));
console.log("---");
}
}
main().catch(console.error);

When running an ingestion pipeline, you can also chose to automatically insert the resulting nodes into a remote vector store.

Then, you can construct an index from that vector store later on.

import fs from "node:fs/promises";
import { QdrantVectorStore } from "@vectorstores/qdrant";
import {
BaseEmbedding,
Document,
IngestionPipeline,
SentenceSplitter,
Settings,
VectorStoreIndex,
} from "@vectorstores/core";
import { OpenAI } from "openai";
async function main() {
// Configure embeddings for BaseEmbedding()
const openai = new OpenAI();
Settings.embedFunc = async (input: string[]): Promise<number[][]> => {
const { data } = await openai.embeddings.create({
model: "text-embedding-3-small",
input,
});
return data.map((d) => d.embedding);
};
// Load essay from abramov.txt in Node
const filePath = fileURLToPath(
new URL("./data/abramov.txt", import.meta.url),
);
const essay = await fs.readFile(filePath, "utf-8");
const vectorStore = new QdrantVectorStore({
host: "http://localhost:6333",
});
// Create Document object with essay
const document = new Document({ text: essay, id_: filePath });
const pipeline = new IngestionPipeline({
transformations: [
new SentenceSplitter({ chunkSize: 1024, chunkOverlap: 20 }),
new BaseEmbedding(),
],
vectorStore,
});
// run the pipeline
await pipeline.run({ documents: [document] });
// create an index for retrieval later on
const index = VectorStoreIndex.fromVectorStore(vectorStore);
}
main().catch(console.error);