dev_cron.erl - Cron Scheduling Device
Overview
Purpose: Schedule messages for passive process self-calling
Module: dev_cron
Device: cron@1.0
Pattern: Schedule → Execute → Repeat (optional)
This device inserts new messages into the schedule, allowing processes to passively "call" themselves without user interaction. It supports both one-time and recurring message scheduling.
Dependencies
- HyperBEAM:
hb_ao,hb_message,hb_name,hb_tracer - Devices:
dev_meta - Erlang/OTP:
timer - Testing:
eunit - Includes:
include/hb.hrl
Public Functions Overview
%% Device Info
-spec info(Opts) -> DeviceInfo.
-spec info(Msg1, Msg2, Opts) -> {ok, InfoResponse}.
%% Scheduling Functions
-spec once(Msg1, Msg2, Opts) -> {ok, TaskID} | {error, Reason}.
-spec every(Msg1, Msg2, Opts) -> {ok, TaskID} | {error, Reason}.
-spec stop(Msg1, Msg2, Opts) -> {ok, Response} | {error, Reason}.Public Functions
1. info/1, info/3
-spec info(Opts) -> DeviceInfo
when
Opts :: map(),
DeviceInfo :: #{exports => [binary()]}.
-spec info(Msg1, Msg2, Opts) -> {ok, InfoResponse}
when
Msg1 :: map(),
Msg2 :: map(),
Opts :: map(),
InfoResponse :: #{
<<"status">> => integer(),
<<"body">> => map()
}.Description: Get device information including version, description, and available paths.
Test Code:-module(dev_cron_info_test).
-include_lib("eunit/include/eunit.hrl").
info_exports_test() ->
Info = dev_cron:info(#{}),
?assert(maps:is_key(exports, Info)),
Exports = maps:get(exports, Info),
?assert(lists:member(<<"once">>, Exports)),
?assert(lists:member(<<"every">>, Exports)),
?assert(lists:member(<<"stop">>, Exports)).
info_detailed_test() ->
{ok, Response} = dev_cron:info(#{}, #{}, #{}),
?assertEqual(200, maps:get(<<"status">>, Response)),
Body = maps:get(<<"body">>, Response),
?assert(maps:is_key(<<"description">>, Body)),
?assert(maps:is_key(<<"version">>, Body)),
?assert(maps:is_key(<<"paths">>, Body)).2. once/3
-spec once(Msg1, Msg2, Opts) -> {ok, TaskID} | {error, Reason}
when
Msg1 :: map(),
Msg2 :: map(),
Opts :: map(),
TaskID :: binary(),
Reason :: binary().Description: Schedule a one-time message execution. Spawns a worker process that executes the specified path once.
Required Parameters:<<"cron-path">>- The path to execute
{ok, TaskID}- Task scheduled successfully{error, <<"No cron path found in message.">>}- Missing cron-path
-module(dev_cron_once_test).
-include_lib("eunit/include/eunit.hrl").
-include("include/hb.hrl").
once_schedule_test() ->
Msg2 = #{
<<"cron-path">> => <<"/test/path">>,
<<"data">> => <<"test">>
},
{ok, TaskID} = dev_cron:once(#{}, Msg2, #{}),
?assert(is_binary(TaskID)),
% Task should be registered
Name = {<<"cron@1.0">>, TaskID},
Pid = hb_name:lookup(Name),
?assert(is_pid(Pid)).
once_missing_path_test() ->
Msg2 = #{<<"data">> => <<"test">>},
Result = dev_cron:once(#{}, Msg2, #{}),
?assertEqual({error, <<"No cron path found in message.">>}, Result).3. every/3
-spec every(Msg1, Msg2, Opts) -> {ok, TaskID} | {error, Reason}
when
Msg1 :: map(),
Msg2 :: map(),
Opts :: map(),
TaskID :: binary(),
Reason :: binary().Description: Schedule a recurring message execution. Spawns a worker process that executes the specified path repeatedly at the given interval.
Required Parameters:<<"cron-path">>- The path to execute<<"interval">>- Time between executions (format:{amount}-{unit})
Interval Format: "N-unit" where unit is:
millisecond[s]- Millisecondssecond[s]- Secondsminute[s]- Minuteshour[s]- Hoursday[s]- Days
{ok, TaskID}- Task scheduled successfully{error, Reason}- Scheduling failed
-module(dev_cron_every_test).
-include_lib("eunit/include/eunit.hrl").
-include("include/hb.hrl").
every_schedule_test() ->
Msg2 = #{
<<"cron-path">> => <<"/test/path">>,
<<"interval">> => <<"500-milliseconds">>
},
{ok, TaskID} = dev_cron:every(#{}, Msg2, #{}),
?assert(is_binary(TaskID)),
Name = {<<"cron@1.0">>, TaskID},
Pid = hb_name:lookup(Name),
?assert(is_pid(Pid)).
every_missing_path_test() ->
Msg2 = #{<<"interval">> => <<"1-second">>},
Result = dev_cron:every(#{}, Msg2, #{}),
?assertEqual({error, <<"No cron path found in message.">>}, Result).
every_missing_interval_test() ->
Msg2 = #{<<"cron-path">> => <<"/test">>},
Result = dev_cron:every(#{}, Msg2, #{}),
?assertEqual({error, <<"No interval found in message.">>}, Result).
every_invalid_interval_test() ->
Msg2 = #{
<<"cron-path">> => <<"/test">>,
<<"interval">> => <<"invalid">>
},
Result = dev_cron:every(#{}, Msg2, #{}),
?assertMatch({error, _}, Result).4. stop/3
-spec stop(Msg1, Msg2, Opts) -> {ok, Response} | {error, Reason}
when
Msg1 :: map(),
Msg2 :: map(),
Opts :: map(),
Response :: #{
<<"status">> => integer(),
<<"body">> => map()
},
Reason :: binary().Description: Stop a scheduled task by its task ID. Kills the worker process and unregisters the task.
Required Parameters:<<"task">>- The task ID to stop
{ok, Response}- Task stopped successfully (status 200){error, <<"No task ID found in message.">>}- Missing task ID{error, <<"Task not found.">>}- Task doesn't exist
-module(dev_cron_stop_test).
-include_lib("eunit/include/eunit.hrl").
-include("include/hb.hrl").
stop_once_task_test() ->
% Create once task
{ok, TaskID} = dev_cron:once(
#{},
#{<<"cron-path">> => <<"/test">>},
#{}
),
% Stop it
{ok, Response} = dev_cron:stop(
#{},
#{<<"task">> => TaskID},
#{}
),
?assertEqual(200, maps:get(<<"status">>, Response)),
% Verify unregistered
Name = {<<"cron@1.0">>, TaskID},
?assertEqual(undefined, hb_name:lookup(Name)).
stop_every_task_test() ->
% Create every task
{ok, TaskID} = dev_cron:every(
#{},
#{
<<"cron-path">> => <<"/test">>,
<<"interval">> => <<"1-second">>
},
#{}
),
% Verify running
Name = {<<"cron@1.0">>, TaskID},
Pid = hb_name:lookup(Name),
?assert(is_pid(Pid)),
?assert(erlang:is_process_alive(Pid)),
% Stop it
{ok, _} = dev_cron:stop(#{}, #{<<"task">> => TaskID}, #{}),
% Verify stopped
timer:sleep(100),
?assertEqual(undefined, hb_name:lookup(Name)),
?assertNot(erlang:is_process_alive(Pid)).
stop_nonexistent_task_test() ->
FakeID = hb_util:human_id(crypto:strong_rand_bytes(32)),
Result = dev_cron:stop(#{}, #{<<"task">> => FakeID}, #{}),
?assertEqual({error, <<"Task not found.">>}, Result).
stop_missing_task_id_test() ->
Result = dev_cron:stop(#{}, #{}, #{}),
?assertEqual({error, <<"No task ID found in message.">>}, Result).Internal Functions
once_worker/3
-spec once_worker(Path, Req, Opts) -> ok
when
Path :: binary(),
Req :: map(),
Opts :: map().Description: Worker process for one-time execution. Calls the specified path via dev_meta:handle/2 then terminates.
every_worker_loop/4
-spec every_worker_loop(CronPath, Req, Opts, IntervalMillis) -> no_return()
when
CronPath :: binary(),
Req :: map(),
Opts :: map(),
IntervalMillis :: integer().Description: Worker loop for recurring execution. Executes path, sleeps for interval, then recurses indefinitely.
Loop:- Execute path via
dev_meta:handle/2 - Sleep for
IntervalMillis - Recurse (tail-call optimized)
parse_time/1
-spec parse_time(BinString) -> Milliseconds
when
BinString :: binary(),
Milliseconds :: integer().Description: Parse a time string into milliseconds.
Format: "{Amount}-{Unit}"
parse_time(<<"100-milliseconds">>) → 100
parse_time(<<"5-seconds">>) → 5000
parse_time(<<"10-minutes">>) → 600000
parse_time(<<"2-hours">>) → 7200000
parse_time(<<"1-day">>) → 86400000Note: This is an internal function that is not exported. It is tested indirectly through the every/3 function tests.
Common Patterns
%% Schedule one-time execution
{ok, TaskID} = dev_cron:once(
#{},
#{
<<"cron-path">> => <<"/process/notify">>,
<<"message">> => <<"Hello">>
},
#{}
).
%% Schedule recurring execution
{ok, TaskID} = dev_cron:every(
#{},
#{
<<"cron-path">> => <<"/process/heartbeat">>,
<<"interval">> => <<"30-seconds">>
},
#{}
).
%% Stop a task
{ok, Response} = dev_cron:stop(
#{},
#{<<"task">> => TaskID},
#{}
).
%% Different interval formats
dev_cron:every(#{}, #{
<<"cron-path">> => <<"/path">>,
<<"interval">> => <<"500-milliseconds">>
}, #{}).
dev_cron:every(#{}, #{
<<"cron-path">> => <<"/path">>,
<<"interval">> => <<"5-minutes">>
}, #{}).
dev_cron:every(#{}, #{
<<"cron-path">> => <<"/path">>,
<<"interval">> => <<"1-hour">>
}, #{}).
%% Via HTTP endpoints
GET /~cron@1.0/once?cron-path=/notify&data=test
GET /~cron@1.0/every?cron-path=/heartbeat&interval=1-minute
GET /~cron@1.0/stop?task={task-id}
GET /~cron@1.0/infoTask Management
Task Registration
Tasks are registered in the name system:
Name = {<<"cron@1.0">>, TaskID}
hb_name:register(Name, WorkerPid)Task Lookup
Name = {<<"cron@1.0">>, TaskID},
Pid = hb_name:lookup(Name)Task Termination
exit(Pid, kill),
hb_name:unregister(Name)Execution Model
Once Task
Create → Spawn Worker → Execute Path → TerminateEvery Task
Create → Spawn Worker → Execute Path → Sleep → Execute Path → Sleep → ...Error Handling
Both task types catch and log execution errors:
try
dev_meta:handle(Opts, Req)
catch
Class:Reason:Stacktrace ->
?event({cron_worker_error, {error, Class, Reason, Stacktrace}})
endImportant: Errors don't stop the every loop - it continues executing.
Tracing
Both task types start with tracing enabled:
TracePID = hb_tracer:start_trace(),
Opts = Opts#{trace => TracePID}This allows execution monitoring via the tracer system.
Use Cases
1. Periodic Health Checks
dev_cron:every(#{}, #{
<<"cron-path">> => <<"/process/health-check">>,
<<"interval">> => <<"5-minutes">>
}, #{}).2. Scheduled Notifications
dev_cron:once(#{}, #{
<<"cron-path">> => <<"/notify/send">>,
<<"recipient">> => <<"user123">>,
<<"message">> => <<"Your task is ready">>
}, #{}).3. Data Cleanup
dev_cron:every(#{}, #{
<<"cron-path">> => <<"/storage/cleanup">>,
<<"interval">> => <<"1-day">>
}, #{}).4. Heartbeat/Keep-Alive
dev_cron:every(#{}, #{
<<"cron-path">> => <<"/connection/heartbeat">>,
<<"interval">> => <<"30-seconds">>
}, #{}).5. Delayed Execution
% Execute once after 1 hour
% (Use timer:sleep before calling once/3, or use interval + stop)Event Monitoring
Events Emitted
% Execution events
?event({cron_once_worker_executed, {path, Path}})
?event({cron_every_worker_executing, {path, Path}})
?event({cron_every_worker_executed, {path, Path}})
% Error events
?event(cron_error, {cron_worker_error, {error, Class, Reason, Stack}})
% Stop events
?event({cron_stopping_task, {task_id, TaskID}, {pid, Pid}})
?event({cron_stop_lookup_error, {task_id, TaskID}, {error, Error}})Interval Units
Supported time units (with plural support):
| Unit | Multiplier | Example |
|---|---|---|
| millisecond(s) | 1 | 100-milliseconds |
| second(s) | 1,000 | 30-seconds |
| minute(s) | 60,000 | 5-minutes |
| hour(s) | 3,600,000 | 2-hours |
| day(s) | 86,400,000 | 1-day |
Note: Unit matching is prefix-based, so both singular and plural forms work.
References
- Meta Device -
dev_meta.erl - Name System -
hb_name.erl - Tracer -
hb_tracer.erl - AO Core -
hb_ao.erl - Message System -
hb_message.erl
Notes
- Passive Execution: Processes call themselves without external triggers
- Two Modes: One-time (
once) and recurring (every) - Task IDs: Based on request message ID
- Name Registration: Tasks registered in global name system
- Process-Based: Each task gets its own worker process
- Stoppable: All tasks can be stopped via task ID
- Error Resilient: Every tasks continue after errors
- Tracing Support: Built-in trace support for monitoring
- Flexible Intervals: Multiple time units supported
- No Persistence: Tasks don't survive node restarts
- Simple API: Three main operations (once, every, stop)
- Path-Based: Executes any device path
- Meta Integration: Uses dev_meta for execution
- Clean Termination: Unregisters names on stop
- HTTP-Friendly: Designed for HTTP API usage