diff --git a/plugins/_office/helpers/artifact_editor.py b/plugins/_office/helpers/artifact_editor.py index 65d7df8d7..3b949f6d8 100644 --- a/plugins/_office/helpers/artifact_editor.py +++ b/plugins/_office/helpers/artifact_editor.py @@ -61,19 +61,20 @@ def edit_artifact( sheet: str = "", cells: Any = None, rows: Any = None, + chart: Any = None, slides: Any = None, **kwargs: Any, ) -> tuple[dict[str, Any], dict[str, Any]]: """Apply a direct saved edit to an Office artifact and return updated metadata.""" path = Path(doc["path"]) ext = str(doc["extension"]).lower() - op = normalize_operation(operation, content=content, find=find, cells=cells, rows=rows, slides=slides) + op = normalize_operation(operation, content=content, find=find, cells=cells, rows=rows, chart=chart, slides=slides) before = path.read_bytes() if ext == "docx": updated, details = _edit_docx(before, op, content=content, find=find, replace=replace, **kwargs) elif ext == "xlsx": - updated, details = _edit_xlsx(path, op, content=content, find=find, replace=replace, sheet=sheet, cells=cells, rows=rows, **kwargs) + updated, details = _edit_xlsx(path, op, content=content, find=find, replace=replace, sheet=sheet, cells=cells, rows=rows, chart=chart, **kwargs) elif ext == "pptx": updated, details = _edit_pptx(before, op, content=content, find=find, replace=replace, slides=slides, **kwargs) else: @@ -104,6 +105,7 @@ def normalize_operation( find: str = "", cells: Any = None, rows: Any = None, + chart: Any = None, slides: Any = None, ) -> str: op = str(operation or "").strip().lower().replace("-", "_") @@ -119,6 +121,10 @@ def normalize_operation( "set_sheet": "set_rows", "write_sheet": "set_rows", "add_rows": "append_rows", + "add_chart": "create_chart", + "chart": "create_chart", + "insert_chart": "create_chart", + "set_chart": "create_chart", "add_slide": "append_slide", "set_deck": "set_slides", } @@ -129,6 +135,8 @@ def normalize_operation( return "set_cells" if rows: return "append_rows" + if chart: + return "create_chart" if slides: return "set_slides" if find: @@ -167,10 +175,13 @@ def _read_xlsx(path: Path) -> dict[str, Any]: values = ["" if value is None else value for value in row] if any(str(value).strip() for value in values): rows.append(values) + charts = [_chart_summary(chart) for chart in getattr(worksheet, "_charts", [])[:20]] sheets.append({ "name": worksheet.title, "max_row": worksheet.max_row, "max_column": worksheet.max_column, + "chart_count": len(getattr(worksheet, "_charts", [])), + "charts": charts, "preview_rows": rows, }) return { @@ -278,9 +289,10 @@ def _edit_xlsx( sheet: str = "", cells: Any = None, rows: Any = None, + chart: Any = None, **kwargs: Any, ) -> tuple[bytes, dict[str, Any]]: - if op not in {"set_text", "set_rows", "append_text", "append_rows", "set_cells", "replace_text", "delete_text"}: + if op not in {"set_text", "set_rows", "append_text", "append_rows", "set_cells", "replace_text", "delete_text", "create_chart"}: raise ValueError(f"Unsupported XLSX operation: {op}") openpyxl = _require_openpyxl() workbook = openpyxl.load_workbook(path) @@ -327,12 +339,536 @@ def _edit_xlsx( details["replacements"] = count if count == 0: return path.read_bytes(), details + elif op == "create_chart": + chart_details = [] + for chart_spec in _normalize_chart_specs(chart, kwargs): + chart_details.append(_create_xlsx_chart(workbook, worksheet, chart_spec)) + details["charts_created"] = len(chart_details) + details["charts"] = chart_details buffer = io.BytesIO() workbook.save(buffer) return buffer.getvalue(), details +_CHART_SPEC_KEYS = { + "anchor", + "categories", + "chart_type", + "close", + "data_range", + "fields", + "from_rows", + "height", + "high", + "include_headers", + "labels", + "legend", + "low", + "open", + "position", + "replace_existing", + "series", + "sheet", + "style", + "title", + "titles_from_data", + "type", + "values", + "width", + "x_axis_title", + "xvalues", + "y_axis_title", + "yvalues", +} + +_CHART_TYPE_ALIASES = { + "area": "area", + "bar": "bar", + "candlestick": "stock", + "col": "column", + "column": "column", + "columns": "column", + "line": "line", + "ohlc": "stock", + "pie": "pie", + "scatter": "scatter", + "stock": "stock", +} + + +def _normalize_chart_specs(chart: Any, kwargs: dict[str, Any]) -> list[dict[str, Any]]: + parsed = _parse_chart_value(chart) + if isinstance(parsed, list): + if not parsed: + raise ValueError("chart list must include at least one chart spec") + return [_normalize_chart_spec(item, {}) for item in parsed] + if parsed is None: + parsed = {} + if not isinstance(parsed, dict): + raise ValueError("chart must be an object, JSON object, or list of chart objects") + return [_normalize_chart_spec(parsed, kwargs)] + + +def _parse_chart_value(value: Any) -> Any: + if value is None or value == "": + return None + if isinstance(value, str): + stripped = value.strip() + if not stripped: + return None + if stripped.startswith("{") or stripped.startswith("["): + return json.loads(stripped) + return {"type": stripped} + return value + + +def _normalize_chart_spec(value: Any, kwargs: dict[str, Any]) -> dict[str, Any]: + if isinstance(value, str): + value = _parse_chart_value(value) + if value is None: + value = {} + if not isinstance(value, dict): + raise ValueError("each chart spec must be an object") + + spec = dict(value) + explicit_include_headers = "include_headers" in spec or "titles_from_data" in spec + for key in _CHART_SPEC_KEYS: + if key in kwargs and kwargs[key] not in (None, ""): + spec[key] = kwargs[key] + if key in {"include_headers", "titles_from_data"}: + explicit_include_headers = True + + explicit_type = bool(spec.get("type") or spec.get("chart_type")) + chart_type = str(spec.get("type") or spec.get("chart_type") or "").strip().lower().replace("-", "_") + if chart_type: + chart_type = _CHART_TYPE_ALIASES.get(chart_type, chart_type) + spec["type"] = chart_type + spec["_explicit_type"] = explicit_type + spec["position"] = str(spec.get("position") or spec.get("anchor") or "H2") + spec["include_headers"] = _bool_value(spec.get("include_headers", spec.get("titles_from_data")), default=True) + spec["_include_headers_explicit"] = explicit_include_headers + spec["from_rows"] = _bool_value(spec.get("from_rows"), default=False) + spec["replace_existing"] = _bool_value(spec.get("replace_existing"), default=False) + spec["width"] = _float_or_default(spec.get("width"), 18.0) + spec["height"] = _float_or_default(spec.get("height"), 10.0) + return spec + + +def _create_xlsx_chart(workbook: Any, default_worksheet: Any, spec: dict[str, Any]) -> dict[str, Any]: + openpyxl = _require_openpyxl() + worksheet = _worksheet(workbook, str(spec.get("sheet") or default_worksheet.title)) + chart_type = spec["type"] or _infer_default_chart_type(worksheet) + if chart_type not in _CHART_TYPE_ALIASES.values(): + raise ValueError(f"Unsupported XLSX chart type: {chart_type}") + + if spec["replace_existing"]: + charts_removed = len(getattr(worksheet, "_charts", [])) + worksheet._charts = [] + else: + charts_removed = 0 + + if chart_type == "stock": + chart, data_range, categories = _stock_chart(openpyxl, workbook, worksheet, spec) + elif chart_type == "scatter": + chart, data_range, categories = _scatter_chart(openpyxl, workbook, worksheet, spec) + else: + chart, data_range, categories = _standard_chart(openpyxl, workbook, worksheet, spec, chart_type) + + _apply_chart_options(chart, spec) + worksheet.add_chart(chart, spec["position"]) + return { + "type": chart_type, + "title": str(spec.get("title") or ""), + "sheet": worksheet.title, + "position": spec["position"], + "data_range": data_range, + "categories": categories, + "series_count": len(getattr(chart, "series", [])), + "charts_removed": charts_removed, + } + + +def _standard_chart(openpyxl: Any, workbook: Any, worksheet: Any, spec: dict[str, Any], chart_type: str) -> tuple[Any, str, str]: + chart_classes = { + "area": openpyxl.chart.AreaChart, + "bar": openpyxl.chart.BarChart, + "column": openpyxl.chart.BarChart, + "line": openpyxl.chart.LineChart, + "pie": openpyxl.chart.PieChart, + } + chart = chart_classes[chart_type]() + if chart_type == "bar": + chart.type = "bar" + elif chart_type == "column": + chart.type = "col" + + include_headers = bool(spec["include_headers"]) + categories = str(spec.get("categories") or spec.get("labels") or "") + if spec.get("series"): + data_range = _add_explicit_series(openpyxl, chart, workbook, worksheet, spec, validate_numeric=True) + else: + range_value = spec.get("values") or spec.get("data_range") or _default_data_range(worksheet, chart_type, include_headers) + include_headers = _include_headers_for_range(spec, range_value) + data_ref, data_sheet, bounds, data_range = _reference_from_range(openpyxl, workbook, worksheet, range_value) + _validate_numeric_series(data_sheet, bounds, include_headers=include_headers, label=data_range) + chart.add_data(data_ref, titles_from_data=include_headers, from_rows=bool(spec["from_rows"])) + + if not categories: + categories = _default_category_range(worksheet, include_headers=include_headers) + if categories: + categories_ref, _, _, categories = _reference_from_range(openpyxl, workbook, worksheet, categories) + chart.set_categories(categories_ref) + return chart, data_range, categories + + +def _stock_chart(openpyxl: Any, workbook: Any, worksheet: Any, spec: dict[str, Any]) -> tuple[Any, str, str]: + chart = openpyxl.chart.StockChart() + field_ranges = _stock_field_ranges(spec) + + if field_ranges: + data_labels = [] + for label in ("open", "high", "low", "close"): + include_headers = _include_headers_for_range(spec, field_ranges[label]) + series_ref, data_sheet, bounds, label_range = _reference_from_range(openpyxl, workbook, worksheet, field_ranges[label]) + _validate_numeric_series(data_sheet, bounds, include_headers=include_headers, label=label) + chart.series.append(openpyxl.chart.Series(series_ref, title_from_data=include_headers)) + data_labels.append(label_range) + data_range = ", ".join(data_labels) + elif spec.get("series"): + include_headers = bool(spec["include_headers"]) + data_range = _add_explicit_series(openpyxl, chart, workbook, worksheet, spec, expected_count=4, validate_numeric=True) + else: + include_headers = bool(spec["include_headers"]) + range_value = spec.get("data_range") or _default_data_range(worksheet, "stock", include_headers) + include_headers = _include_headers_for_range(spec, range_value) + _, data_sheet, bounds, range_label = _reference_from_range(openpyxl, workbook, worksheet, range_value) + min_col, min_row, max_col, max_row = bounds + columns = list(range(min_col, max_col + 1)) + if len(columns) > 4 and _looks_like_category_header(data_sheet.cell(row=min_row, column=min_col).value): + columns = columns[1:5] + else: + columns = columns[:4] + if len(columns) != 4: + raise ValueError("stock charts require exactly four Open, High, Low, Close data series") + _validate_stock_headers(data_sheet, columns, min_row, include_headers=include_headers) + for column in columns: + _validate_numeric_series(data_sheet, (column, min_row, column, max_row), include_headers=include_headers, label=data_sheet.cell(row=min_row, column=column).value or _column_letter(column)) + series_ref = openpyxl.chart.Reference(data_sheet, min_col=column, min_row=min_row, max_col=column, max_row=max_row) + chart.series.append(openpyxl.chart.Series(series_ref, title_from_data=include_headers)) + data_range = range_label + + categories = str(spec.get("categories") or spec.get("labels") or _default_category_range(worksheet, include_headers=bool(spec["include_headers"]))) + if categories: + categories_ref, _, _, categories = _reference_from_range(openpyxl, workbook, worksheet, categories) + chart.set_categories(categories_ref) + chart.hiLowLines = openpyxl.chart.axis.ChartLines() + chart.upDownBars = openpyxl.chart.updown_bars.UpDownBars() + return chart, data_range, categories + + +def _scatter_chart(openpyxl: Any, workbook: Any, worksheet: Any, spec: dict[str, Any]) -> tuple[Any, str, str]: + chart = openpyxl.chart.ScatterChart() + include_headers = bool(spec["include_headers"]) + categories = str(spec.get("xvalues") or spec.get("categories") or _default_category_range(worksheet, include_headers=include_headers)) + x_ref, x_sheet, x_bounds, categories = _reference_from_range(openpyxl, workbook, worksheet, categories) + if include_headers and x_bounds[1] == 1 and x_bounds[3] > 1: + x_ref = openpyxl.chart.Reference(x_sheet, min_col=x_bounds[0], min_row=2, max_col=x_bounds[2], max_row=x_bounds[3]) + + data_ranges = [] + series_items = _series_items(spec) + if series_items: + for item in series_items: + values_ref, title, data_range = _series_values_reference(openpyxl, workbook, worksheet, item, include_headers=include_headers, validate_numeric=True) + xvalues = item.get("xvalues") or item.get("x") or item.get("categories") + if xvalues: + item_x_ref, item_x_sheet, item_x_bounds, _ = _reference_from_range(openpyxl, workbook, worksheet, xvalues) + if include_headers and item_x_bounds[1] == 1 and item_x_bounds[3] > 1: + item_x_ref = openpyxl.chart.Reference(item_x_sheet, min_col=item_x_bounds[0], min_row=2, max_col=item_x_bounds[2], max_row=item_x_bounds[3]) + else: + item_x_ref = x_ref + chart.series.append(openpyxl.chart.Series(values_ref, xvalues=item_x_ref, title=title)) + data_ranges.append(data_range) + else: + range_value = spec.get("yvalues") or spec.get("values") or spec.get("data_range") or _default_data_range(worksheet, "scatter", include_headers) + _, data_sheet, bounds, data_range = _reference_from_range(openpyxl, workbook, worksheet, range_value) + min_col, min_row, max_col, max_row = bounds + first_row = min_row + 1 if include_headers and min_row == 1 and max_row > 1 else min_row + for column in range(min_col, max_col + 1): + title = data_sheet.cell(row=min_row, column=column).value if first_row > min_row else None + _validate_numeric_series(data_sheet, (column, min_row, column, max_row), include_headers=include_headers, label=title or _column_letter(column)) + y_ref = openpyxl.chart.Reference(data_sheet, min_col=column, min_row=first_row, max_col=column, max_row=max_row) + chart.series.append(openpyxl.chart.Series(y_ref, xvalues=x_ref, title=str(title) if title is not None else None)) + return chart, ", ".join(data_ranges) if data_ranges else data_range, categories + + +def _add_explicit_series( + openpyxl: Any, + chart: Any, + workbook: Any, + worksheet: Any, + spec: dict[str, Any], + expected_count: int | None = None, + validate_numeric: bool = False, +) -> str: + include_headers = bool(spec["include_headers"]) + ranges = [] + for item in _series_items(spec): + values_ref, title, label = _series_values_reference(openpyxl, workbook, worksheet, item, include_headers=include_headers, validate_numeric=validate_numeric) + if title: + chart.series.append(openpyxl.chart.Series(values_ref, title=title)) + else: + chart.series.append(openpyxl.chart.Series(values_ref, title_from_data=include_headers)) + ranges.append(label) + if expected_count is not None and len(ranges) != expected_count: + raise ValueError(f"chart requires exactly {expected_count} series") + return ", ".join(ranges) + + +def _series_items(spec: dict[str, Any]) -> list[dict[str, Any]]: + raw = spec.get("series") or [] + if isinstance(raw, str): + raw = json.loads(raw) + if not isinstance(raw, list): + raise ValueError("chart series must be a list") + items = [] + for item in raw: + if isinstance(item, str): + items.append({"values": item}) + elif isinstance(item, dict): + items.append(item) + else: + raise ValueError("chart series entries must be objects or range strings") + return items + + +def _series_values_reference( + openpyxl: Any, + workbook: Any, + worksheet: Any, + item: dict[str, Any], + *, + include_headers: bool, + validate_numeric: bool = False, +) -> tuple[Any, str | None, str]: + values = item.get("values") or item.get("range") or item.get("yvalues") or item.get("y") + if not values: + raise ValueError("chart series entries require values or range") + ref, data_sheet, bounds, label = _reference_from_range(openpyxl, workbook, worksheet, values) + title = item.get("title") or item.get("name") + min_col, min_row, max_col, max_row = bounds + if include_headers and min_row == 1 and max_col == min_col and max_row > min_row: + title = title if title is not None else data_sheet.cell(row=min_row, column=min_col).value + ref = openpyxl.chart.Reference(data_sheet, min_col=min_col, min_row=min_row + 1, max_col=max_col, max_row=max_row) + if validate_numeric: + _validate_numeric_series(data_sheet, bounds, include_headers=include_headers, label=title or label) + return ref, str(title) if title is not None else None, label + + +def _stock_field_ranges(spec: dict[str, Any]) -> dict[str, Any]: + fields = spec.get("fields") or {} + if isinstance(fields, str): + fields = json.loads(fields) + if not isinstance(fields, dict): + raise ValueError("stock chart fields must be an object") + result = {} + for label in ("open", "high", "low", "close"): + value = spec.get(label) or fields.get(label) + if value: + result[label] = value + if result and set(result) != {"open", "high", "low", "close"}: + raise ValueError("stock chart fields must include open, high, low, and close") + return result + + +def _reference_from_range(openpyxl: Any, workbook: Any, default_worksheet: Any, value: Any) -> tuple[Any, Any, tuple[int, int, int, int], str]: + sheet_name, cell_range = _split_range_ref(value, default_worksheet.title) + worksheet = _worksheet(workbook, sheet_name) + min_col, min_row, max_col, max_row = openpyxl.utils.cell.range_boundaries(cell_range) + reference = openpyxl.chart.Reference(worksheet, min_col=min_col, min_row=min_row, max_col=max_col, max_row=max_row) + return reference, worksheet, (min_col, min_row, max_col, max_row), _range_label(openpyxl, worksheet.title, min_col, min_row, max_col, max_row) + + +def _split_range_ref(value: Any, default_sheet: str) -> tuple[str, str]: + if isinstance(value, dict): + sheet = str(value.get("sheet") or default_sheet) + min_cell = value.get("range") or value.get("ref") + if min_cell: + return _split_range_ref(str(min_cell), sheet) + min_col = value.get("min_col") + min_row = value.get("min_row") + max_col = value.get("max_col", min_col) + max_row = value.get("max_row", min_row) + if min_col is None or min_row is None: + raise ValueError("range objects require range/ref or min_col and min_row") + return sheet, f"{_cell_ref(min_col, min_row)}:{_cell_ref(max_col, max_row)}" + ref = str(value or "").strip() + if not ref: + raise ValueError("chart range is required") + if "!" not in ref: + return default_sheet, ref + sheet, cell_range = ref.rsplit("!", 1) + return sheet.strip().strip("'") or default_sheet, cell_range + + +def _range_label(openpyxl: Any, sheet_title: str, min_col: int, min_row: int, max_col: int, max_row: int) -> str: + start = f"{openpyxl.utils.cell.get_column_letter(min_col)}{min_row}" + end = f"{openpyxl.utils.cell.get_column_letter(max_col)}{max_row}" + sheet = sheet_title if re.fullmatch(r"[A-Za-z_][A-Za-z0-9_]*", sheet_title) else f"'{sheet_title}'" + return f"{sheet}!{start}:{end}" + + +def _include_headers_for_range(spec: dict[str, Any], range_value: Any) -> bool: + include_headers = bool(spec["include_headers"]) + if spec.get("_include_headers_explicit"): + return include_headers + try: + _, cell_range = _split_range_ref(range_value, "") + _, min_row, _, _ = __import__("openpyxl").utils.cell.range_boundaries(cell_range) + except Exception: + return include_headers + return include_headers and min_row == 1 + + +def _validate_stock_headers(data_sheet: Any, columns: list[int], min_row: int, *, include_headers: bool) -> None: + if not include_headers or min_row != 1: + return + expected = ["open", "high", "low", "close"] + found = [str(data_sheet.cell(row=min_row, column=column).value or "").strip().lower() for column in columns] + if found != expected: + raise ValueError(f"stock charts require Open, High, Low, Close columns in order; found {found}") + + +def _validate_numeric_series(data_sheet: Any, bounds: tuple[int, int, int, int], *, include_headers: bool, label: Any) -> None: + min_col, min_row, max_col, max_row = bounds + start_row = min_row + 1 if include_headers and min_row == 1 and max_row > min_row else min_row + values = [ + data_sheet.cell(row=row, column=column).value + for column in range(min_col, max_col + 1) + for row in range(start_row, max_row + 1) + ] + numeric_count = sum(1 for value in values if isinstance(value, (int, float)) and not isinstance(value, bool)) + if numeric_count == 0: + name = str(label or _range_label(__import__("openpyxl"), data_sheet.title, min_col, min_row, max_col, max_row)) + raise ValueError(f"chart series '{name}' has no numeric data") + + +def _default_category_range(worksheet: Any, *, include_headers: bool) -> str: + if (worksheet.max_column or 0) < 2 or (worksheet.max_row or 0) < 2: + return "" + first_row_is_header = _looks_like_category_header(worksheet.cell(row=1, column=1).value) + return f"A{2 if include_headers or first_row_is_header else 1}:A{worksheet.max_row}" + + +def _default_data_range(worksheet: Any, chart_type: str, include_headers: bool) -> str: + max_row = worksheet.max_row or 1 + max_col = worksheet.max_column or 1 + if chart_type == "stock": + if max_col < 5 or max_row < 2: + raise ValueError("stock charts need Date, Open, High, Low, Close columns or explicit ranges") + return f"B{1 if include_headers else 2}:E{max_row}" + if chart_type == "pie" and max_col >= 2: + return f"B{1 if include_headers else 2}:B{max_row}" + start_col = 2 if max_col >= 2 else 1 + return f"{_column_letter(start_col)}{1 if include_headers else 2}:{_column_letter(max_col)}{max_row}" + + +def _cell_ref(column: Any, row: Any) -> str: + return f"{_column_letter(column)}{int(row)}" + + +def _column_letter(column: Any) -> str: + if isinstance(column, str) and column.isalpha(): + return column.upper() + return __import__("openpyxl").utils.cell.get_column_letter(int(column)) + + +def _infer_default_chart_type(worksheet: Any) -> str: + headers = [str(worksheet.cell(row=1, column=column).value or "").strip().lower() for column in range(1, (worksheet.max_column or 0) + 1)] + if {"open", "high", "low", "close"}.issubset(set(headers)): + return "stock" + return "line" + + +def _looks_like_category_header(value: Any) -> bool: + return str(value or "").strip().lower() in {"date", "time", "category", "label", "month", "year"} + + +def _apply_chart_options(chart: Any, spec: dict[str, Any]) -> None: + if spec.get("title"): + chart.title = str(spec["title"]) + if spec.get("style") not in (None, ""): + chart.style = int(spec["style"]) + chart.width = spec["width"] + chart.height = spec["height"] + if _bool_value(spec.get("legend"), default=True) is False: + chart.legend = None + if spec.get("x_axis_title") and hasattr(chart, "x_axis"): + chart.x_axis.title = str(spec["x_axis_title"]) + if spec.get("y_axis_title") and hasattr(chart, "y_axis"): + chart.y_axis.title = str(spec["y_axis_title"]) + + +def _chart_summary(chart: Any) -> dict[str, Any]: + return { + "type": _chart_kind(chart), + "title": _chart_title(chart), + "anchor": _chart_anchor(chart), + "series_count": len(getattr(chart, "series", [])), + } + + +def _chart_kind(chart: Any) -> str: + name = chart.__class__.__name__.replace("Chart", "").lower() + return {"bar": "bar_or_column", "stock": "stock"}.get(name, name) + + +def _chart_title(chart: Any) -> str: + title = getattr(chart, "title", None) + if title is None or isinstance(title, str): + return title or "" + try: + paragraphs = title.tx.rich.p + parts = [] + for paragraph in paragraphs: + for run in paragraph.r: + if run.t: + parts.append(run.t) + return "".join(parts) + except Exception: + return "" + + +def _chart_anchor(chart: Any) -> str: + openpyxl = _require_openpyxl() + anchor = getattr(chart, "anchor", "") + if isinstance(anchor, str): + return anchor + marker = getattr(anchor, "_from", None) + if marker is None: + return "" + return f"{openpyxl.utils.cell.get_column_letter(marker.col + 1)}{marker.row + 1}" + + +def _bool_value(value: Any, default: bool = False) -> bool: + if value in (None, ""): + return default + if isinstance(value, bool): + return value + if isinstance(value, (int, float)): + return bool(value) + return str(value).strip().lower() not in {"0", "false", "no", "off", "none"} + + +def _float_or_default(value: Any, default: float) -> float: + if value in (None, ""): + return default + try: + return float(value) + except (TypeError, ValueError): + return default + + def _edit_pptx(before: bytes, op: str, *, content: str = "", find: str = "", replace: str = "", slides: Any = None, **kwargs: Any) -> tuple[bytes, dict[str, Any]]: if op not in {"set_text", "set_slides", "append_text", "append_slide", "replace_text", "delete_text"}: raise ValueError(f"Unsupported PPTX operation: {op}") diff --git a/plugins/_office/helpers/wopi_store.py b/plugins/_office/helpers/wopi_store.py index a3c0da6ec..d7a738727 100644 --- a/plugins/_office/helpers/wopi_store.py +++ b/plugins/_office/helpers/wopi_store.py @@ -1,8 +1,11 @@ from __future__ import annotations +import csv import hashlib +import io import json import os +import re import secrets import shutil import sqlite3 @@ -564,8 +567,6 @@ def template_bytes(kind: str, ext: str, title: str, content: str) -> bytes: def _zip_bytes(files_map: dict[str, str | bytes], stored: set[str] | None = None) -> bytes: - import io - buffer = io.BytesIO() with zipfile.ZipFile(buffer, "w", compression=zipfile.ZIP_DEFLATED) as archive: for name, value in files_map.items(): @@ -587,10 +588,10 @@ def _docx(title: str, content: str) -> bytes: def _xlsx(title: str, content: str) -> bytes: - rows = [title] + [line for line in content.splitlines() if line.strip()] + rows = _xlsx_rows(title, content) sheet_rows = "".join( - f'{escape(line)}' - for idx, line in enumerate(rows, start=1) + f'{"".join(_xlsx_cell(row_idx, col_idx, value) for col_idx, value in enumerate(row, start=1))}' + for row_idx, row in enumerate(rows, start=1) ) return _zip_bytes({ "[Content_Types].xml": """""", @@ -601,6 +602,85 @@ def _xlsx(title: str, content: str) -> bytes: }) +def _xlsx_rows(title: str, content: str) -> list[list[Any]]: + parsed = _tabular_rows(content) + if parsed: + return parsed + lines = [line for line in str(content or "").splitlines() if line.strip()] + if lines: + return [[title], *[[line] for line in lines]] + return [[title]] + + +def _tabular_rows(content: str) -> list[list[Any]]: + text = str(content or "").strip("\n") + if not text.strip(): + return [] + lines = [line for line in text.splitlines() if line.strip()] + markdown_rows = _markdown_table_rows(lines) + if markdown_rows: + return markdown_rows + + delimiter = "\t" if any("\t" in line for line in lines) else ("," if any("," in line for line in lines) else None) + if not delimiter: + return [] + return [[_xlsx_value(cell) for cell in row] for row in csv.reader(io.StringIO("\n".join(lines)), delimiter=delimiter)] + + +def _markdown_table_rows(lines: list[str]) -> list[list[Any]]: + table_lines = [line.strip() for line in lines if line.strip().startswith("|") and line.strip().endswith("|")] + if len(table_lines) < 2: + return [] + rows = [] + for line in table_lines: + cells = [cell.strip() for cell in line.strip("|").split("|")] + if all(re.fullmatch(r":?-{3,}:?", cell or "") for cell in cells): + continue + rows.append([_xlsx_value(cell) for cell in cells]) + return rows + + +def _xlsx_cell(row_idx: int, col_idx: int, value: Any) -> str: + ref = f"{_column_name(col_idx)}{row_idx}" + value = _xlsx_value(value) + if value in (None, ""): + return f'' + if isinstance(value, bool): + return f'{1 if value else 0}' + if isinstance(value, (int, float)): + return f'{value}' + return f'{escape(str(value))}' + + +def _xlsx_value(value: Any) -> Any: + if not isinstance(value, str): + return value + stripped = value.strip() + if not stripped: + return "" + if stripped.lower() in {"true", "false"}: + return stripped.lower() == "true" + if re.fullmatch(r"[+-]?\d+", stripped) and not (len(stripped.lstrip("+-")) > 1 and stripped.lstrip("+-").startswith("0")): + try: + return int(stripped) + except ValueError: + return stripped + if re.fullmatch(r"[+-]?(?:\d+\.\d*|\.\d+)(?:[eE][+-]?\d+)?", stripped) or re.fullmatch(r"[+-]?\d+[eE][+-]?\d+", stripped): + try: + return float(stripped) + except ValueError: + return stripped + return stripped + + +def _column_name(index: int) -> str: + name = "" + while index: + index, remainder = divmod(index - 1, 26) + name = chr(65 + remainder) + name + return name + + def _pptx(title: str, content: str) -> bytes: subtitle = content.splitlines()[0] if content.splitlines() else "" return _zip_bytes({ diff --git a/plugins/_office/prompts/agent.system.tool.document_artifact.md b/plugins/_office/prompts/agent.system.tool.document_artifact.md index 4e255f3fb..96786b1b6 100644 --- a/plugins/_office/prompts/agent.system.tool.document_artifact.md +++ b/plugins/_office/prompts/agent.system.tool.document_artifact.md @@ -3,4 +3,7 @@ create/open/read/edit reusable Office artifacts in the Agent Zero canvas formats: docx xlsx pptx odt ods odp methods: create open read edit inspect export version_history restore_version status common args: kind title format content path file_id +XLSX charts: use edit operation `create_chart` with `chart` object instead of code execution for embedded spreadsheet charts +chart types: line bar column pie area scatter stock ohlc candlestick +XLSX create/edit tabular content: CSV, TSV, Markdown tables, or rows arrays become real spreadsheet cells for nontrivial Office artifact work, load skill `office-artifacts` first diff --git a/plugins/_office/skills/office-artifacts/SKILL.md b/plugins/_office/skills/office-artifacts/SKILL.md index 535ddafa3..b241cbafa 100644 --- a/plugins/_office/skills/office-artifacts/SKILL.md +++ b/plugins/_office/skills/office-artifacts/SKILL.md @@ -1,7 +1,7 @@ --- name: office-artifacts description: Use when creating, opening, reading, or editing editable Office canvas artifacts such as DOCX documents, XLSX spreadsheets, and PPTX presentations with the document_artifact tool. -version: "1.0.0" +version: "1.1.0" author: "Agent Zero Core Team" tags: ["office", "docx", "xlsx", "pptx", "canvas", "documents", "spreadsheets", "presentations"] triggers: @@ -44,6 +44,8 @@ Create: } ``` +For spreadsheets, `content` can be CSV, TSV, or a Markdown table; the tool writes real cells, not one text blob per row. + Read: ```json { @@ -82,10 +84,31 @@ Set spreadsheet cells: } ``` +Create an embedded spreadsheet chart: +```json +{ + "tool_name": "document_artifact:edit", + "tool_args": { + "file_id": "abc123", + "operation": "create_chart", + "sheet": "Sheet1", + "chart": { + "type": "line", + "title": "Monthly Revenue", + "data_range": "B1:C13", + "categories": "A2:A13", + "position": "E1", + "width": 18, + "height": 10 + } + } +} +``` + ## Edit Operations - DOCX: `set_text`, `append_text`, `prepend_text`, `replace_text`, `delete_text`. -- XLSX: `set_cells`, `append_rows`, `set_rows`, `replace_text`, `delete_text`. +- XLSX: `set_cells`, `append_rows`, `set_rows`, `create_chart`, `replace_text`, `delete_text`. - PPTX: `set_slides`, `append_slide`, `replace_text`, `delete_text`. Arguments: @@ -93,6 +116,7 @@ Arguments: - `replace_text` and `delete_text` require `find`; `replace_text` uses `replace`. - `set_cells` accepts `{ "A1": "value", "Sheet2!B3": 42 }` or `[{"sheet":"Sheet1","cell":"A1","value":"value"}]`. - `rows` accepts an array of rows. `content` can also be CSV, TSV, or a Markdown table. +- `create_chart` accepts `chart` as an object or JSON string. Supported XLSX chart types: `line`, `bar`, `column`, `pie`, `area`, `scatter`, `stock`, `ohlc`, `candlestick`. Use `data_range`, `categories`/`labels`, `position`, `title`, `width`, and `height`. For stock-style charts only, provide Open/High/Low/Close columns in that order, or rely on a sheet whose headers are `Date, Open, High, Low, Close`. - `slides` accepts `[{"title":"Slide title","bullets":["point"]}]`. Text slides can be separated with a line containing `---`. - `count` limits text replacements. @@ -100,5 +124,6 @@ Arguments: - Prefer `file_id` from canvas context or prior tool output; use `path` when that is all you have. - Use `read` before editing unless the current saved content is already known. +- Use native `create_chart` for embedded spreadsheet charts. Reach for Python/code execution only when the requested chart behavior is not supported by the tool. - Use `edit` for precise saved changes; use the visual Office canvas for human/manual layout polish. - Direct edits update version history and refresh the canvas on edit/open results. diff --git a/plugins/_office/tools/document_artifact.py b/plugins/_office/tools/document_artifact.py index 5a3367b3b..5d5656599 100644 --- a/plugins/_office/tools/document_artifact.py +++ b/plugins/_office/tools/document_artifact.py @@ -25,6 +25,7 @@ class DocumentArtifact(Tool): sheet: str = "", cells: Any = None, rows: Any = None, + chart: Any = None, slides: Any = None, max_chars: int | str = 12000, **kwargs: Any, @@ -57,6 +58,7 @@ class DocumentArtifact(Tool): sheet=sheet, cells=cells, rows=rows, + chart=chart, slides=slides, **kwargs, ) diff --git a/tests/test_office_wopi_store.py b/tests/test_office_wopi_store.py index 53b7df9f8..279d16e49 100644 --- a/tests/test_office_wopi_store.py +++ b/tests/test_office_wopi_store.py @@ -201,6 +201,85 @@ def test_document_artifact_xlsx_edit_sets_cells_and_appends_rows(office_state): assert ["Research", 9800] in rows +def test_document_artifact_xlsx_create_parses_csv_content_for_charting(office_state): + doc = wopi_store.create_document( + "spreadsheet", + "Revenue Demo", + "xlsx", + "\n".join([ + "Month,Revenue,Costs", + "Jan,120,80", + "Feb,135,92", + "Mar,150,96", + ]), + ) + content = artifact_editor.read_artifact(doc) + rows = content["sheets"][0]["preview_rows"] + + assert rows[0] == ["Month", "Revenue", "Costs"] + assert rows[1] == ["Jan", 120, 80] + + updated, payload = artifact_editor.edit_artifact( + doc, + operation="create_chart", + chart={"type": "line", "position": "E1"}, + ) + + assert payload["changed"] is True + assert payload["charts"][0]["type"] == "line" + assert payload["charts"][0]["position"] == "E1" + assert artifact_editor.read_artifact(updated)["sheets"][0]["chart_count"] == 1 + + +def test_document_artifact_xlsx_stock_chart_rejects_non_numeric_ohlc_data(office_state): + doc = wopi_store.create_document( + "spreadsheet", + "Broken Trading Demo", + "xlsx", + "\n".join([ + "Date,Open,High,Low,Close", + "2026-04-24,open,high,low,close", + "2026-04-25,still,not,real,numbers", + ]), + ) + + with pytest.raises(ValueError, match="no numeric data"): + artifact_editor.edit_artifact(doc, operation="create_chart", chart={"type": "candlestick"}) + + +def test_document_artifact_xlsx_edit_creates_stock_chart(office_state): + doc = wopi_store.create_document("spreadsheet", "Trading Demo", "xlsx", "") + rows = [ + ["Date", "Open", "High", "Low", "Close", "Volume"], + ["2026-04-24", 100, 105, 99, 104, 1000], + ["2026-04-25", 104, 106, 102, 103, 1200], + ["2026-04-28", 103, 108, 101, 107, 1800], + ] + updated, _ = artifact_editor.edit_artifact(doc, operation="set_rows", rows=rows) + + updated, payload = artifact_editor.edit_artifact( + updated, + operation="create_chart", + chart={ + "type": "candlestick", + "title": "DEMO Stock Price (OHLC)", + "position": "A8", + "width": 16, + "height": 8, + }, + ) + content = artifact_editor.read_artifact(updated) + sheet = content["sheets"][0] + + assert payload["changed"] is True + assert payload["charts_created"] == 1 + assert payload["charts"][0]["type"] == "stock" + assert payload["charts"][0]["series_count"] == 4 + assert sheet["chart_count"] == 1 + assert sheet["charts"][0]["type"] == "stock" + assert sheet["charts"][0]["title"] == "DEMO Stock Price (OHLC)" + + def test_document_artifact_pptx_edit_sets_slides(office_state): doc = wopi_store.create_document("presentation", "Roadmap", "pptx", "Initial")