diff --git a/__tests__/posts.ts b/__tests__/posts.ts index b2afdb10fd..407bbc20f7 100644 --- a/__tests__/posts.ts +++ b/__tests__/posts.ts @@ -60,6 +60,7 @@ import { DEFAULT_POST_TITLE, notifyContentRequested, notifyView, + ONE_DAY_IN_SECONDS, pickImageUrl, postScraperOrigin, updateFlagsStatement, @@ -2430,6 +2431,26 @@ describe('mutation deletePost', () => { await verifyPostDeleted(post.id, loggedUser); }); + it('should allow member to delete their own scheduled post', async () => { + loggedUser = '2'; + const source = await con.getRepository(Source).findOneByOrFail({ id: 'a' }); + const post = await createSquadWelcomePost(con, source, loggedUser); + await con.getRepository(Post).update( + { id: post.id }, + { + type: PostType.Freeform, + visible: false, + visibleAt: null, + flags: { + visible: false, + scheduledAt: new Date(Date.now() + 60_000).toISOString(), + }, + }, + ); + + await verifyPostDeleted(post.id, loggedUser); + }); + it('should delete the welcome post by a moderator or an admin', async () => { loggedUser = '2'; await con.getRepository(SourceMember).save({ @@ -5509,12 +5530,14 @@ describe('mutation createFreeformPost', () => { $title: String! $content: String! $image: Upload + $scheduledAt: DateTime ) { createFreeformPost( sourceId: $sourceId title: $title content: $content image: $image + scheduledAt: $scheduledAt ) { id author { @@ -5531,6 +5554,9 @@ describe('mutation createFreeformPost', () => { contentHtml type private + flags { + scheduledAt + } } } `; @@ -5545,6 +5571,82 @@ describe('mutation createFreeformPost', () => { await saveSquadFixtures(); }); + it('should create a scheduled post', async () => { + loggedUser = '1'; + const scheduledAt = new Date(Date.now() + 60_000).toISOString(); + + const res = await client.mutate(MUTATION, { + variables: { ...params, scheduledAt }, + }); + + expect(res.errors).toBeFalsy(); + expect(res.data.createFreeformPost.flags.scheduledAt).toEqual(scheduledAt); + + const post = await con.getRepository(Post).findOneByOrFail({ + id: res.data.createFreeformPost.id, + }); + + expect(post.visible).toBe(false); + expect(post.visibleAt).toBeNull(); + expect(post.flags.scheduledAt).toEqual(scheduledAt); + }); + + it('should not create a scheduled post more than 14 days ahead', () => { + loggedUser = '1'; + + return testMutationErrorCode( + client, + { + mutation: MUTATION, + variables: { + ...params, + scheduledAt: new Date(Date.now() + 15 * ONE_DAY_IN_SECONDS * 1000), + }, + }, + 'GRAPHQL_VALIDATION_FAILED', + 'Scheduled time must be within 14 days', + ); + }); + + it('should list scheduled posts', async () => { + loggedUser = '1'; + const scheduledAt = new Date(Date.now() + 60_000).toISOString(); + const createRes = await client.mutate(MUTATION, { + variables: { ...params, scheduledAt }, + }); + + expect(createRes.errors).toBeFalsy(); + + const res = await client.query(/* GraphQL */ ` + query ScheduledPosts { + scheduledPosts { + edges { + node { + id + title + flags { + scheduledAt + } + } + } + } + } + `); + + expect(res.errors).toBeFalsy(); + expect(res.data.scheduledPosts.edges).toEqual([ + { + node: { + id: createRes.data.createFreeformPost.id, + title: params.title, + flags: { + scheduledAt, + }, + }, + }, + ]); + }); + it('should not authorize when moderation is required', async () => { loggedUser = '4'; await testMutationErrorCode( @@ -5652,7 +5754,7 @@ describe('mutation createFreeformPost', () => { .getRepository(Source) .update({ id: 'a' }, { type: SourceType.Machine }); - testMutationErrorCode( + return testMutationErrorCode( client, { mutation: MUTATION, variables: { ...params, sourceId: 'a' } }, 'NOT_FOUND', @@ -6982,13 +7084,16 @@ describe('mutation createSourcePostModeration', () => { describe('mutation editPost', () => { const MUTATION = ` - mutation EditPost($id: ID!, $title: String, $content: String, $image: Upload) { - editPost(id: $id, title: $title, content: $content, image: $image) { + mutation EditPost($id: ID!, $title: String, $content: String, $image: Upload, $scheduledAt: DateTime) { + editPost(id: $id, title: $title, content: $content, image: $image, scheduledAt: $scheduledAt) { id title content contentHtml type + flags { + scheduledAt + } source { id } @@ -7091,6 +7196,62 @@ describe('mutation editPost', () => { ); }); + it('should update scheduled time for a scheduled post', async () => { + loggedUser = '1'; + const oldScheduledAt = new Date(Date.now() + 60_000).toISOString(); + const scheduledAt = new Date(Date.now() + 120_000).toISOString(); + + await con.getRepository(Post).update( + { id: 'p1' }, + { + type: PostType.Freeform, + authorId: loggedUser, + visible: false, + visibleAt: null, + flags: { + visible: false, + scheduledAt: oldScheduledAt, + }, + }, + ); + + const res = await client.mutate(MUTATION, { + variables: { id: 'p1', scheduledAt }, + }); + + expect(res.errors).toBeFalsy(); + expect(res.data.editPost.flags.scheduledAt).toEqual(scheduledAt); + + const post = await con.getRepository(Post).findOneByOrFail({ id: 'p1' }); + expect(post.visible).toBe(false); + expect(post.flags.scheduledAt).toEqual(scheduledAt); + }); + + it('should not update scheduled time after post is published', async () => { + loggedUser = '1'; + await con.getRepository(Post).update( + { id: 'p1' }, + { + type: PostType.Freeform, + authorId: loggedUser, + visible: true, + }, + ); + + return testMutationErrorCode( + client, + { + mutation: MUTATION, + variables: { + id: 'p1', + scheduledAt: new Date(Date.now() + 60_000).toISOString(), + }, + }, + 'GRAPHQL_VALIDATION_FAILED', + 'Cannot update scheduled time after post is published', + ); + }); + it('should update title of the post if it is either freeform or welcome post', async () => { loggedUser = '1'; await con diff --git a/__tests__/temporal/notifications/activities.ts b/__tests__/temporal/notifications/activities.ts index 12be370997..1c58dd0b19 100644 --- a/__tests__/temporal/notifications/activities.ts +++ b/__tests__/temporal/notifications/activities.ts @@ -83,3 +83,37 @@ describe('sendEntityReminder activity', () => { ); }); }); + +describe('publishScheduledPost activity', () => { + it('should publish a scheduled post and refresh createdAt', async () => { + const scheduledAt = new Date(Date.now() - 1_000).toISOString(); + const originalCreatedAt = new Date(Date.now() - 60_000); + + await con.getRepository(Post).update( + { id: 'p1' }, + { + visible: false, + visibleAt: null, + createdAt: originalCreatedAt, + flags: { + visible: false, + scheduledAt, + }, + }, + ); + + await env.run(activities.publishScheduledPost, { + postId: 'p1', + scheduledAt, + }); + + const post = await con.getRepository(Post).findOneByOrFail({ id: 'p1' }); + + expect(post.visible).toBe(true); + expect(post.visibleAt).toBeInstanceOf(Date); + expect(post.createdAt.getTime()).toBeGreaterThan( + originalCreatedAt.getTime(), + ); + expect(post.flags).toEqual({ visible: true }); + }); +}); diff --git a/__tests__/temporal/notifications/utils.ts b/__tests__/temporal/notifications/utils.ts index 40eede2c8e..573334c72b 100644 --- a/__tests__/temporal/notifications/utils.ts +++ b/__tests__/temporal/notifications/utils.ts @@ -3,8 +3,11 @@ import { cancelReminderWorkflow, getEntityReminderWorkflowId, getReminderWorkflowId, + getScheduledPostPublishWorkflowId, runEntityReminderWorkflow, runReminderWorkflow, + runScheduledPostPublishWorkflow, + cancelScheduledPostPublishWorkflow, } from '../../../src/temporal/notifications/utils'; import { createMockTemporalClient } from '../../helpers'; @@ -194,3 +197,65 @@ describe('cancelEntityReminderWorkflow', () => { expect(mock.terminate).not.toHaveBeenCalled(); }); }); + +describe('getScheduledPostPublishWorkflowId', () => { + it('should generate scheduled post workflow id', () => { + const params = { + postId: 'p1', + scheduledAt: '2026-07-02T09:00:00.000Z', + }; + + expect(getScheduledPostPublishWorkflowId(params)).toBe( + `notification:scheduled-post:p1:${new Date(params.scheduledAt).getTime()}`, + ); + }); +}); + +describe('runScheduledPostPublishWorkflow', () => { + it('should start scheduled post workflow', async () => { + mock.describe.mockRejectedValueOnce(notFoundError()); + mock.start.mockResolvedValueOnce({ describe: mock.describe }); + + const params = { + postId: 'p1', + scheduledAt: new Date(Date.now() + 10_000).toISOString(), + }; + + const result = await runScheduledPostPublishWorkflow(params); + + expect(result).toBeDefined(); + expect(mock.start).toHaveBeenCalledWith( + expect.anything(), + expect.objectContaining({ + workflowId: getScheduledPostPublishWorkflowId(params), + taskQueue: 'notification-queue', + }), + ); + }); + + it('should not start scheduled post workflow when workflow exists', async () => { + mock.describe.mockResolvedValueOnce({ status: { name: 'RUNNING' } }); + + const result = await runScheduledPostPublishWorkflow({ + postId: 'p1', + scheduledAt: new Date(Date.now() + 10_000).toISOString(), + }); + + expect(result).toBeUndefined(); + expect(mock.start).not.toHaveBeenCalled(); + }); +}); + +describe('cancelScheduledPostPublishWorkflow', () => { + it('should cancel scheduled post workflow', async () => { + mock.describe.mockResolvedValueOnce({ status: { name: 'RUNNING' } }); + mock.terminate.mockResolvedValueOnce(undefined); + + await cancelScheduledPostPublishWorkflow({ + postId: 'p1', + scheduledAt: '2026-07-02T09:00:00.000Z', + }); + + expect(mock.terminate).toHaveBeenCalled(); + }); +}); diff --git a/__tests__/temporal/notifications/workflows.ts b/__tests__/temporal/notifications/workflows.ts index 4070f4d87e..b0afeab358 100644 --- a/__tests__/temporal/notifications/workflows.ts +++ b/__tests__/temporal/notifications/workflows.ts @@ -1,6 +1,7 @@ import { bookmarkReminderWorkflow, entityReminderWorkflow, + scheduledPostPublishWorkflow, } from '../../../src/temporal/notifications/workflows'; import { BookmarkActivities } from '../../../src/temporal/notifications/activities'; import { TestWorkflowEnvironment } from '@temporalio/testing'; @@ -8,6 +9,7 @@ import { Worker } from '@temporalio/worker'; import { getEntityReminderWorkflowId, getReminderWorkflowId, + getScheduledPostPublishWorkflowId, } from '../../../src/temporal/notifications/utils'; let testEnv: TestWorkflowEnvironment; @@ -15,10 +17,12 @@ let testEnv: TestWorkflowEnvironment; const validateBookmark = jest.fn(); const sendBookmarkReminder = jest.fn(); const sendEntityReminder = jest.fn(); +const publishScheduledPost = jest.fn(); const mockActivities: BookmarkActivities = { validateBookmark, sendBookmarkReminder, sendEntityReminder, + publishScheduledPost, }; jest.mock('../../../src/temporal/client', () => ({ @@ -117,3 +121,26 @@ describe('entityReminderWorkflow workflow', () => { expect(mockActivities.sendEntityReminder).toHaveBeenCalledWith(params); }); }); + +describe('scheduledPostPublishWorkflow workflow', () => { + it('should publish scheduled post', async () => { + const worker = await createWorker(); + const params = { + postId: 'p1', + scheduledAt: new Date().toISOString(), + }; + + publishScheduledPost.mockReturnValueOnce(undefined); + + await worker.runUntil( + testEnv.client.workflow.execute(scheduledPostPublishWorkflow, { + workflowId: getScheduledPostPublishWorkflowId(params), + args: [params], + taskQueue: 'test', + }), + ); + + expect(mockActivities.publishScheduledPost).toHaveBeenCalledTimes(1); + expect(mockActivities.publishScheduledPost).toHaveBeenCalledWith(params); + }); +}); diff --git a/src/common/post.ts b/src/common/post.ts index 3b1c8d643a..3e208062c0 100644 --- a/src/common/post.ts +++ b/src/common/post.ts @@ -74,6 +74,10 @@ import { z } from 'zod'; import { canPostToSquad } from '../schema/sources'; import { MAX_POST_CONTENT_EMBEDS, replaceContentEmbeds } from './contentEmbeds'; import { ContentEmbedParentType } from '../entity/ContentEmbed'; +import { + getScheduledPostFlags, + validatePostScheduledAt, +} from './postScheduling'; export type SourcePostModerationArgs = ConnectionArguments & { sourceId: string; @@ -275,7 +279,9 @@ export type EditablePost = Pick< export type CreatePost = Pick< FreeformPost, 'title' | 'content' | 'image' | 'contentHtml' | 'authorId' | 'sourceId' | 'id' ->; +> & { + scheduledAt?: Date | null; +}; type CreateFreeformPostArgs = { con: DataSource | EntityManager; @@ -385,6 +391,16 @@ export const insertFreeformPost = async ({ private: privacy, }, }); + const scheduledAt = validatePostScheduledAt(args.scheduledAt); + + if (scheduledAt) { + createdPost.visible = false; + createdPost.visibleAt = null; + createdPost.flags = { + ...createdPost.flags, + ...getScheduledPostFlags(scheduledAt), + }; + } // Apply vordr checks before saving createdPost = await preparePostForInsert(createdPost, { @@ -577,6 +593,7 @@ export interface EditPostArgs extends Pick< 'id' | 'title' | 'content' > { image: Promise; + scheduledAt?: Date | null; } export interface CreatePostArgs extends Pick< @@ -584,6 +601,7 @@ export interface CreatePostArgs extends Pick< 'title' | 'content' | 'image' > { sourceId: string; + scheduledAt?: Date | null; } export interface PollOptionInput { @@ -624,6 +642,7 @@ export const postInMultipleSourcesArgsSchema = z commentary: z.string().max(MAX_TITLE_LENGTH).optional(), image: z.custom>(), imageUrl: z.httpUrl().optional(), + scheduledAt: z.coerce.date().nullish(), sourceIds: z.array(z.string()).min(1).max(MAX_MULTIPLE_POST_SOURCE_LIMIT), sharedPostId: z.string().optional(), externalLink: z.httpUrl().optional(), @@ -690,6 +709,13 @@ export const createPostIntoSourceId = async ( args: CreatePostInSourceArgs, ): Promise> => { const type = getMultipleSourcesPostType(args); + + if (args.scheduledAt && type !== PostType.Freeform) { + throw new ValidationError( + 'Scheduling is only supported for freeform posts', + ); + } + switch (type) { case PostType.Share: { await ctx.con @@ -1459,6 +1485,7 @@ export const createFreeformPost = async ( contentHtml, authorId: userId, sourceId, + scheduledAt: args.scheduledAt, }; if (image && process.env.CLOUDINARY_URL) { diff --git a/src/common/postScheduling.ts b/src/common/postScheduling.ts new file mode 100644 index 0000000000..9e95f698e0 --- /dev/null +++ b/src/common/postScheduling.ts @@ -0,0 +1,122 @@ +import { ValidationError } from 'apollo-server-errors'; +import type { DataSource, EntityManager } from 'typeorm'; +import { Post } from '../entity/posts/Post'; +import { ONE_DAY_IN_SECONDS } from './constants'; + +type ConnectionManager = DataSource | EntityManager; + +type PostScheduleInput = Date | string | number | null | undefined; +type PostScheduleFlagsInput = { + flags?: Post['flags'] | null; +}; +const MAX_POST_SCHEDULE_DAYS = 14; + +export type ScheduledPostPublishParams = { + postId: string; + scheduledAt: string; +}; + +export const parsePostScheduledAt = ( + scheduledAt: PostScheduleInput, +): Date | null => { + if (!scheduledAt) { + return null; + } + + const date = new Date(scheduledAt); + + if (Number.isNaN(date.getTime())) { + throw new ValidationError('Invalid scheduled time'); + } + + return date; +}; + +export const validatePostScheduledAt = ( + scheduledAt: PostScheduleInput, +): Date | null => { + const date = parsePostScheduledAt(scheduledAt); + + if (date && date.getTime() <= Date.now()) { + throw new ValidationError('Scheduled time must be in the future'); + } + + if ( + date && + date.getTime() > + Date.now() + MAX_POST_SCHEDULE_DAYS * ONE_DAY_IN_SECONDS * 1000 + ) { + throw new ValidationError( + `Scheduled time must be within ${MAX_POST_SCHEDULE_DAYS} days`, + ); + } + + return date; +}; + +export const getPostScheduledAt = ({ + flags, +}: PostScheduleFlagsInput): Date | null => { + return parsePostScheduledAt(flags?.scheduledAt); +}; + +export const getScheduledPostFlags = (scheduledAt: Date) => ({ + scheduledAt: scheduledAt.toISOString(), + visible: false, +}); + +export const getPublishedPostFlagsStatement = (): (() => string) => { + const flags = JSON.stringify({ visible: true }); + + return () => `(flags - 'scheduledAt') || '${flags}'`; +}; + +export const publishScheduledPost = async ({ + con, + postId, + scheduledAt, +}: { + con: ConnectionManager; + postId: string; + scheduledAt: PostScheduleInput; +}): Promise => { + const expectedScheduledAt = parsePostScheduledAt(scheduledAt); + + if (!expectedScheduledAt) { + return; + } + + const post = await con.getRepository(Post).findOne({ + select: ['id', 'flags', 'visible', 'deleted', 'banned'], + where: { id: postId }, + }); + + if (!post || post.visible || post.deleted || post.banned) { + return; + } + + const currentScheduledAt = getPostScheduledAt(post); + + if ( + !currentScheduledAt || + currentScheduledAt.getTime() !== expectedScheduledAt.getTime() || + currentScheduledAt.getTime() > Date.now() + ) { + return; + } + + const now = new Date(); + + await con.getRepository(Post).update( + { + id: post.id, + visible: false, + }, + { + visible: true, + visibleAt: now, + createdAt: now, + flags: getPublishedPostFlagsStatement(), + }, + ); +}; diff --git a/src/entity/posts/Post.ts b/src/entity/posts/Post.ts index cc30712d7e..e75a8d4392 100644 --- a/src/entity/posts/Post.ts +++ b/src/entity/posts/Post.ts @@ -56,6 +56,7 @@ export type PostFlags = Partial<{ sources: number; savedTime: number; generatedAt: Date; + scheduledAt: string; dedupKey: string; digestPostIds: string[]; collectionSources: string[]; @@ -80,6 +81,7 @@ export type PostFlagsPublic = Pick< | 'sources' | 'savedTime' | 'generatedAt' + | 'scheduledAt' | 'digestPostIds' | 'ad' >; diff --git a/src/graphorm/index.ts b/src/graphorm/index.ts index c2a3feeda6..1bacafc63f 100644 --- a/src/graphorm/index.ts +++ b/src/graphorm/index.ts @@ -107,6 +107,11 @@ import type { OpportunityFlagsPublic, } from '../entity/opportunities/Opportunity'; import { isNullOrUndefined } from '../common/object'; + +type PostGraphormContext = Context & { + includeInvisiblePosts?: boolean; +}; + export enum LocationVerificationStatus { GeoIP = 'geoip', UserProvided = 'user_provided', @@ -755,10 +760,15 @@ const obj = new GraphORM({ }, }, Post: { - additionalQuery: (ctx, alias, qb) => - qb - .andWhere(`"${alias}"."deleted" = false`) - .andWhere(`"${alias}"."visible" = true`), + additionalQuery: (ctx, alias, qb) => { + const query = qb.andWhere(`"${alias}"."deleted" = false`); + + if (!(ctx as PostGraphormContext).includeInvisiblePosts) { + query.andWhere(`"${alias}"."visible" = true`); + } + + return query; + }, requiredColumns: [ 'id', 'shortId', @@ -973,6 +983,7 @@ const obj = new GraphORM({ return { ...value, generatedAt: transformDate(value.generatedAt), + scheduledAt: transformDate(value.scheduledAt), digestPostIds: value?.digestPostIds ?? null, ad: ad ? { diff --git a/src/schema/posts.ts b/src/schema/posts.ts index 8de6b09406..4e2f8c9657 100644 --- a/src/schema/posts.ts +++ b/src/schema/posts.ts @@ -165,6 +165,10 @@ import { pollCreationSchema } from '../common/schema/polls'; import { QueryDeepPartialEntity } from 'typeorm/query-builder/QueryPartialEntity'; import { PollPost } from '../entity/posts/PollPost'; import type { LiveRoom } from '../entity/LiveRoom'; +import { + getScheduledPostFlags, + validatePostScheduledAt, +} from '../common/postScheduling'; export interface GQLPollOption { id: string; @@ -232,6 +236,25 @@ export interface GQLPost { numPollVotes?: number; } +type ScheduledPostsContext = AuthContext & { + includeInvisiblePosts?: boolean; +}; + +const withInvisiblePosts = async ( + ctx: AuthContext, + callback: (ctx: ScheduledPostsContext) => Promise, + includeInvisiblePosts = true, +): Promise => { + const graphormCtx = ctx as ScheduledPostsContext; + graphormCtx.includeInvisiblePosts = includeInvisiblePosts; + + try { + return await callback(graphormCtx); + } finally { + delete graphormCtx.includeInvisiblePosts; + } +}; + interface PinPostArgs { id: string; pinned: boolean; @@ -467,6 +490,11 @@ export const typeDefs = /* GraphQL */ ` """ promoteToPublic: Int @auth(requires: [MODERATOR]) + """ + Time the post is scheduled to go live + """ + scheduledAt: DateTime + """ Cover video """ @@ -1386,6 +1414,20 @@ export const typeDefs = /* GraphQL */ ` """ first: Int ): PostConnection! @auth + + """ + Get paginated list of scheduled posts authored by the authenticated user + """ + scheduledPosts( + """ + Paginate after opaque cursor + """ + after: String + """ + Paginate first + """ + first: Int + ): PostConnection! @auth } extend type Mutation { @@ -1431,6 +1473,10 @@ export const typeDefs = /* GraphQL */ ` """ imageUrl: String """ + Time the post should go live + """ + scheduledAt: DateTime + """ ID of the post to share """ sharedPostId: ID @@ -1567,6 +1613,10 @@ export const typeDefs = /* GraphQL */ ` Content of the post """ content: String + """ + Time the post should go live + """ + scheduledAt: DateTime ): Post! @auth @rateLimit(limit: 1, duration: 30) """ @@ -1589,6 +1639,10 @@ export const typeDefs = /* GraphQL */ ` Content of the post """ content: String + """ + Time the post should go live + """ + scheduledAt: DateTime ): Post! @auth """ @@ -1986,6 +2040,7 @@ const postCodeSnippetPageGenerator = offsetPageGenerator( 100, 500, ); +const scheduledPostsPageGenerator = offsetPageGenerator(20, 100); export const resolvers: IResolvers = { Query: { @@ -2534,6 +2589,45 @@ export const resolvers: IResolvers = { }, ); }, + scheduledPosts: async ( + _, + args: ConnectionArguments, + ctx: AuthContext, + info, + ): Promise> => { + const page = scheduledPostsPageGenerator.connArgsToPage(args); + + return withInvisiblePosts(ctx, (graphormCtx) => + graphorm.queryPaginated( + graphormCtx, + info, + (nodeSize) => + scheduledPostsPageGenerator.hasPreviousPage(page, nodeSize), + (nodeSize) => scheduledPostsPageGenerator.hasNextPage(page, nodeSize), + (node, index) => + scheduledPostsPageGenerator.nodeToCursor(page, args, node, index), + (builder) => { + builder.queryBuilder = builder.queryBuilder + .andWhere(`${builder.alias}.authorId = :userId`, { + userId: ctx.userId, + }) + .andWhere(`${builder.alias}.visible = false`) + .andWhere(`${builder.alias}.flags->>'scheduledAt' IS NOT NULL`) + .andWhere(`${builder.alias}.type = :type`, { + type: PostType.Freeform, + }) + .orderBy(`${builder.alias}.flags->>'scheduledAt'`, 'ASC') + .addOrderBy(`${builder.alias}.id`, 'ASC') + .limit(page.limit) + .offset(page.offset); + + return builder; + }, + undefined, + true, + ), + ); + }, }, Mutation: { createSourcePostModeration: async ( @@ -2777,13 +2871,18 @@ export const resolvers: IResolvers = { const { id } = await createFreeformPost(ctx.con, ctx, args); - return graphorm.queryOneOrFail(ctx, info, (builder) => ({ - ...builder, - queryBuilder: builder.queryBuilder.where( - `"${builder.alias}"."id" = :id`, - { id }, - ), - })); + return withInvisiblePosts( + ctx, + (graphormCtx) => + graphorm.queryOneOrFail(graphormCtx, info, (builder) => ({ + ...builder, + queryBuilder: builder.queryBuilder.where( + `"${builder.alias}"."id" = :id`, + { id }, + ), + })), + !!args.scheduledAt, + ); }, editPost: async ( _, @@ -2794,6 +2893,7 @@ export const resolvers: IResolvers = { const { id, image } = args; const { con, userId } = ctx; const { title, content } = validatePost(args); + const updatesContent = args.content !== undefined; await con.transaction(async (manager) => { const repo = manager.getRepository(Post); @@ -2821,7 +2921,30 @@ export const resolvers: IResolvers = { ); } - let updated: Partial = {}; + let updated: Partial< + EditablePost & Pick + > = {}; + + if (args.scheduledAt !== undefined) { + if (post.visible) { + throw new ValidationError( + 'Cannot update scheduled time after post is published', + ); + } + + const scheduledAt = validatePostScheduledAt(args.scheduledAt); + + if (!scheduledAt) { + throw new ValidationError('Scheduled time is required'); + } + + updated.visible = false; + updated.visibleAt = null; + updated.flags = { + ...updated.flags, + ...getScheduledPostFlags(scheduledAt), + }; + } if (title && title !== post.title) { updated.title = title; @@ -2837,7 +2960,7 @@ export const resolvers: IResolvers = { updated.image = coverImageUrl; } - if (content !== post.content) { + if (updatesContent && content !== post.content) { const mentions = await getMentions( manager, content, @@ -2888,13 +3011,15 @@ export const resolvers: IResolvers = { } }); - return graphorm.queryOneOrFail(ctx, info, (builder) => ({ - ...builder, - queryBuilder: builder.queryBuilder.where( - `"${builder.alias}"."id" = :id`, - { id }, - ), - })); + return withInvisiblePosts(ctx, (graphormCtx) => + graphorm.queryOneOrFail(graphormCtx, info, (builder) => ({ + ...builder, + queryBuilder: builder.queryBuilder.where( + `"${builder.alias}"."id" = :id`, + { id }, + ), + })), + ); }, banPost: async ( source, diff --git a/src/temporal/common.ts b/src/temporal/common.ts index 94f08b7fd9..e7de7058de 100644 --- a/src/temporal/common.ts +++ b/src/temporal/common.ts @@ -10,6 +10,7 @@ export enum WorkflowTopic { export enum WorkflowTopicScope { Bookmark = 'bookmark', Entity = 'entity', + ScheduledPost = 'scheduled-post', } export enum WorkflowQueue { diff --git a/src/temporal/notifications/activities.ts b/src/temporal/notifications/activities.ts index 16c22ca904..739f292613 100644 --- a/src/temporal/notifications/activities.ts +++ b/src/temporal/notifications/activities.ts @@ -4,6 +4,10 @@ import { Bookmark } from '../../entity/Bookmark'; import { logger } from '../../logger'; import type { entityReminderSchema } from '../../common/schema/reminders'; import type z from 'zod'; +import { + publishScheduledPost as publishScheduledPostRecord, + type ScheduledPostPublishParams, +} from '../../common/postScheduling'; interface CommonBookmarkReminderParams { userId: string; @@ -39,6 +43,11 @@ export const createActivities = ({ con }: InjectedProps) => ({ params, ); }, + async publishScheduledPost( + params: ScheduledPostPublishParams, + ): Promise { + await publishScheduledPostRecord({ con, ...params }); + }, }); export type BookmarkActivities = ReturnType; diff --git a/src/temporal/notifications/utils.ts b/src/temporal/notifications/utils.ts index 22c8518e02..ea096cb177 100644 --- a/src/temporal/notifications/utils.ts +++ b/src/temporal/notifications/utils.ts @@ -11,10 +11,12 @@ import { BookmarkReminderParams, bookmarkReminderWorkflow, entityReminderWorkflow, + scheduledPostPublishWorkflow, } from './workflows'; import { getTemporalClient } from '../client'; import type z from 'zod'; import type { entityReminderSchema } from '../../common/schema/reminders'; +import type { ScheduledPostPublishParams } from '../../common/postScheduling'; interface ReminderWorkflowParams extends BookmarkReminderParams { remindAt: number; @@ -115,3 +117,50 @@ export const cancelEntityReminderWorkflow = async ( return await handle.terminate(); } }; + +export const getScheduledPostPublishWorkflowId = ({ + postId, + scheduledAt, +}: ScheduledPostPublishParams) => + generateWorkflowId( + WorkflowTopic.Notification, + WorkflowTopicScope.ScheduledPost, + [postId, new Date(scheduledAt).getTime().toString()], + ); + +export const runScheduledPostPublishWorkflow = async ( + params: ScheduledPostPublishParams, +) => { + const workflowId = getScheduledPostPublishWorkflowId(params); + const client = await getTemporalClient(); + const description = await getWorkflowDescription(workflowId); + const delayMs = new Date(params.scheduledAt).getTime() - Date.now(); + + if (description?.status.name === 'RUNNING' || delayMs <= 0) { + return; + } + + return client.workflow.start(scheduledPostPublishWorkflow, { + args: [params], + workflowId, + taskQueue: WorkflowQueue.Notification, + startDelay: delayMs, + }); +}; + +export const cancelScheduledPostPublishWorkflow = async ( + params: ScheduledPostPublishParams, +) => { + const workflowId = getScheduledPostPublishWorkflowId(params); + const handle = await getWorkflowHandle(workflowId); + + if (!handle) { + return; + } + + const description = await getDescribeOrError(handle); + + if (description?.status.name === 'RUNNING') { + return await handle.terminate(); + } +}; diff --git a/src/temporal/notifications/workflows.ts b/src/temporal/notifications/workflows.ts index 1ea118e800..6e2a25b258 100644 --- a/src/temporal/notifications/workflows.ts +++ b/src/temporal/notifications/workflows.ts @@ -2,6 +2,7 @@ import { proxyActivities } from '@temporalio/workflow'; import { BookmarkActivities, type createActivities } from './activities'; import type { entityReminderSchema } from '../../common/schema/reminders'; import type z from 'zod'; +import type { ScheduledPostPublishParams } from '../../common/postScheduling'; export interface BookmarkReminderParams { userId: string; @@ -35,3 +36,15 @@ export const entityReminderWorkflow = async ( await sendEntityReminder(params); }; + +export const scheduledPostPublishWorkflow = async ( + params: ScheduledPostPublishParams, +): Promise => { + const { publishScheduledPost } = proxyActivities< + ReturnType + >({ + scheduleToCloseTimeout: '15s', + }); + + await publishScheduledPost(params); +}; diff --git a/src/workers/cdc/primary.ts b/src/workers/cdc/primary.ts index d30bd6b47f..778a0922c4 100644 --- a/src/workers/cdc/primary.ts +++ b/src/workers/cdc/primary.ts @@ -128,8 +128,10 @@ import { counters } from '../../telemetry'; import { cancelEntityReminderWorkflow, cancelReminderWorkflow, + cancelScheduledPostPublishWorkflow, runEntityReminderWorkflow, runReminderWorkflow, + runScheduledPostPublishWorkflow, } from '../../temporal/notifications/utils'; import { addDays, differenceInMonths, nextMonday, nextTuesday } from 'date-fns'; import { hasPlusStatusChanged } from '../../paddle'; @@ -139,6 +141,7 @@ import { sourceReportReasonsMap, userReportReasonsMap, } from '../../entity/common'; +import { getPostScheduledAt } from '../../common/postScheduling'; import { utcToZonedTime } from 'date-fns-tz'; import { SourceReport } from '../../entity/sources/SourceReport'; import { @@ -1069,11 +1072,57 @@ const onSettingsChange = async ( } }; +const syncScheduledPostPublishWorkflow = async ( + data: ChangeMessage, +): Promise => { + const before = data.payload.before; + const after = data.payload.after; + const beforeScheduledAt = before ? getPostChangeScheduledAt(before) : null; + const afterScheduledAt = after ? getPostChangeScheduledAt(after) : null; + + if ( + before && + beforeScheduledAt && + (!afterScheduledAt || + beforeScheduledAt.getTime() !== afterScheduledAt.getTime() || + after?.visible || + after?.deleted || + after?.banned) + ) { + await cancelScheduledPostPublishWorkflow({ + postId: before.id, + scheduledAt: beforeScheduledAt.toISOString(), + }); + } + + if ( + after && + afterScheduledAt && + !after.visible && + !after.deleted && + !after.banned && + (!beforeScheduledAt || + beforeScheduledAt.getTime() !== afterScheduledAt.getTime()) + ) { + await runScheduledPostPublishWorkflow({ + postId: after.id, + scheduledAt: afterScheduledAt.toISOString(), + }); + } +}; + +const getPostChangeScheduledAt = (post: ChangeObject): Date | null => + getPostScheduledAt({ + flags: JSON.parse(post.flags || '{}') as Post['flags'], + }); + const onPostChange = async ( con: DataSource, logger: FastifyBaseLogger, data: ChangeMessage, ): Promise => { + await syncScheduledPostPublishWorkflow(data); + if (data.payload.after?.yggdrasilId && !data.payload.before?.yggdrasilId) { await notifyPostYggdrasilIdSet(logger, data.payload.after); } @@ -1086,7 +1135,7 @@ const onPostChange = async ( if (isFreeformPostLongEnough(freeform)) { await notifyFreeformContentRequested(logger, freeform); } - if (data.payload.after!.authorId) { + if (data.payload.after!.visible && data.payload.after!.authorId) { await checkAchievementProgress( con, logger, @@ -1145,6 +1194,17 @@ const onPostChange = async ( if (data.payload.after!.visible) { if (!data.payload.before!.visible) { await notifyPostVisible(logger, data.payload.after!); + if ( + data.payload.after!.type === PostType.Freeform && + data.payload.after!.authorId + ) { + await checkAchievementProgress( + con, + logger, + data.payload.after!.authorId, + AchievementEventType.PostFreeform, + ); + } } else { // Trigger message only if the post is already visible and the conte was edited const freeform = data as ChangeMessage;