balls/lib/balls_pds/ecto/schema/object.ex

106 lines
2.6 KiB
Elixir

defmodule BallsPDS.Ecto.Schema.Object do
use Ecto.Schema
import Ecto.Changeset
import Ecto.Query
alias BallsPDS.Ecto.Schema.ObjectReadAgent
alias BallsPDS.Repo
alias BallsPDS.Util.ACL
schema "objects" do
field(:path, :string)
field(:content_type, :string)
field(:activitypub_type, :string)
field(:storage_key, :string)
field(:public, :boolean, default: false)
field(:total_items, :integer)
timestamps()
end
def changeset(struct, params \\ %{}) do
struct
|> cast(params, [
:path,
:content_type,
:activitypub_type,
:storage_key,
:public,
:total_items
])
|> validate_required([
:path,
:public
])
|> validate_path(:path)
|> validate_format(
:content_type,
~r/^(application|audio|font|image|message|model|multipart|text|video)\/[\w\-\+\.]+(?:;\s*charset=[\w\-]+)?$/i,
allow_nil: true
)
|> validate_format(:activitypub_type, ~r/[A-Z][a-zA-Z0-9]*/, allow_nil: true)
|> validate_format(:storage_key, ~r/^[0-9a-f]+$/,
allow_nil: true,
message: "Invalid hexadecimal string"
)
|> validate_inclusion(:public, [true, false])
|> validate_number(:total_items, greater_than: -1, allow_nil: true)
|> unique_constraint(:path)
end
defp validate_path(changeset, field) do
validate_change(changeset, field, fn _, path ->
case BallsPDS.Util.Util.is_valid_path?(path) do
:ok ->
[]
{:error, :invalid_char} ->
[{field, "Invalid characters"}]
{:error, :trailing_or_no_starting_slash} ->
[{field, "Trailing slash, or no starting slash"}]
{:error, :sequential_slashes} ->
[{field, "Sequential slashes"}]
{:error, :relative_path} ->
[{field, "Relative path"}]
end
end)
end
def get_by_path(path) when is_binary(path) do
query =
from(o in __MODULE__,
where: o.path == ^path
)
Repo.one(query)
end
def get_ap_id(object_id) do
from(o in __MODULE__,
where: o.id == ^object_id,
select: o.path
)
|> Repo.one()
|> case do
nil -> nil
path -> ACL.make_object_url(path)
end
end
def get_all_by_paths(paths) when is_list(paths) do
from(o in __MODULE__,
where: o.path in ^paths
)
|> Repo.all()
end
def is_authorized_read?(%__MODULE__{public: true}, _), do: true
def is_authorized_read?(%__MODULE__{}, nil), do: false
def is_authorized_read?(%__MODULE__{} = object, acl) when is_binary(acl),
do: ObjectReadAgent.is_authorized_read?(object, acl)
end